/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.Job;
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.middleware.HandlerMiddleware;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.listener.JobImpl;
import com.github.sonus21.rqueue.listener.MessageContainerBase;
import com.github.sonus21.rqueue.listener.PostProcessingHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageHeaders;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;

class RqueueExecutor
extends MessageContainerBase {
    private final PostProcessingHandler postProcessingHandler;
    private final QueueThreadPool queueThreadPool;
    private final RqueueMessage rqueueMessage;
    private final RqueueBeanProvider beanProvider;
    private final QueueDetail queueDetail;
    private final List<Middleware> middlewareList;
    private boolean updatedToProcessing;
    private JobImpl job;
    private ExecutionStatus status;
    private Throwable error;
    private int failureCount;
    private Object userMessage;

    RqueueExecutor(RqueueBeanProvider rqueueBeanProvider, RqueueMessageListenerContainer.QueueStateMgr queueStateMgr, List<Middleware> middlewares, PostProcessingHandler postProcessingHandler, RqueueMessage rqueueMessage, QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
        super(LoggerFactory.getLogger(RqueueExecutor.class), queueDetail.getName(), queueStateMgr);
        this.middlewareList = middlewares;
        this.postProcessingHandler = postProcessingHandler;
        this.beanProvider = rqueueBeanProvider;
        this.queueThreadPool = queueThreadPool;
        this.rqueueMessage = rqueueMessage;
        this.queueDetail = queueDetail;
    }

    private Object getUserMessage() {
        Message tmpMessage = MessageBuilder.createMessage((Object)this.rqueueMessage.getMessage(), (MessageHeaders)RqueueMessageHeaders.buildMessageHeaders(this.queueDetail.getName(), this.rqueueMessage, null, null, this.rqueueMessage.getMessageHeaders()));
        try {
            return RqueueMessageUtils.convertMessageToObject((Message<String>)tmpMessage, this.beanProvider.getRqueueMessageHandler().getMessageConverter());
        }
        catch (Exception e) {
            this.log(Level.DEBUG, "Unable to convert message {}", e, this.rqueueMessage.getMessage());
            return this.rqueueMessage.getMessage();
        }
    }

    private void init() {
        MessageMetadata messageMetadata = this.beanProvider.getRqueueMessageMetadataService().getOrCreateMessageMetadata(this.rqueueMessage);
        this.userMessage = this.getUserMessage();
        this.job = new JobImpl(this.beanProvider.getRqueueConfig(), this.beanProvider.getRqueueMessageMetadataService(), this.beanProvider.getRqueueJobDao(), this.beanProvider.getRqueueMessageTemplate(), this.beanProvider.getRqueueLockManager(), this.queueDetail, messageMetadata, this.rqueueMessage, this.userMessage, this.postProcessingHandler);
        this.failureCount = this.job.getRqueueMessage().getFailureCount();
    }

    private int getMaxRetryCount() {
        return Objects.isNull(this.job.getRqueueMessage().getRetryCount()) ? this.job.getQueueDetail().getNumRetry() : this.job.getRqueueMessage().getRetryCount().intValue();
    }

    private void updateCounter(boolean fail) {
        RqueueMetricsCounter counter = this.beanProvider.getRqueueMetricsCounter();
        if (Objects.isNull(counter)) {
            return;
        }
        if (fail) {
            counter.updateFailureCount(this.job.getQueueDetail().getName());
        } else {
            counter.updateExecutionCount(this.job.getQueueDetail().getName());
        }
    }

    private long maxExecutionTime() {
        return this.job.getQueueDetail().getVisibilityTimeout() - 1000L;
    }

    private long getMaxProcessingTime() {
        return System.currentTimeMillis() + this.maxExecutionTime();
    }

    private boolean isMessageDeleted() {
        MessageMetadata messageMetadata = this.job.getMessageMetadata();
        boolean deleted = messageMetadata.isDeleted();
        if (!deleted) {
            MessageMetadata newMessageMetadata = this.beanProvider.getRqueueMessageMetadataService().getOrCreateMessageMetadata(this.job.getRqueueMessage());
            messageMetadata.merge(newMessageMetadata);
        }
        if (deleted = messageMetadata.isDeleted()) {
            if (this.rqueueMessage.isPeriodic()) {
                this.log(Level.INFO, "Periodic Message {} having period {} has been deleted", null, this.rqueueMessage.getId(), this.rqueueMessage.getPeriod());
            } else {
                this.log(Level.INFO, "Message {} has been deleted", null, this.rqueueMessage.getId());
            }
        }
        return deleted;
    }

    private boolean shouldIgnore() {
        return !this.beanProvider.getPreExecutionMessageProcessor().process(this.job);
    }

    private boolean isOldMessage() {
        return this.job.getMessageMetadata().getRqueueMessage() != null && this.job.getMessageMetadata().getRqueueMessage().getQueuedTime() != this.job.getRqueueMessage().getQueuedTime();
    }

    private int getRetryCount() {
        int maxRetry = this.getMaxRetryCount();
        if (this.beanProvider.getRqueueConfig().getRetryPerPoll() == -1) {
            return maxRetry;
        }
        return Math.min(this.beanProvider.getRqueueConfig().getRetryPerPoll(), maxRetry);
    }

    private boolean queueInactive() {
        return !this.isQueueActive(this.job.getQueueDetail().getName());
    }

    private ExecutionStatus getStatus() {
        if (this.queueInactive()) {
            return ExecutionStatus.QUEUE_INACTIVE;
        }
        if (this.shouldIgnore()) {
            return ExecutionStatus.IGNORED;
        }
        if (this.isMessageDeleted()) {
            return ExecutionStatus.DELETED;
        }
        if (this.isOldMessage()) {
            return ExecutionStatus.OLD_MESSAGE;
        }
        return null;
    }

    private void updateToProcessing() {
        if (this.updatedToProcessing) {
            return;
        }
        this.updatedToProcessing = true;
        this.job.updateMessageStatus(MessageStatus.PROCESSING);
    }

    private void logExecutionTimeWarning(long maxProcessingTime, long startTime) {
        if (System.currentTimeMillis() > maxProcessingTime) {
            long maxAllowedTime = this.maxExecutionTime();
            long executionTime = System.currentTimeMillis() - startTime;
            this.log(Level.WARN, "Message listener is taking longer time [Queue: {}, TaskStatus: {}] MaxAllowedTime: {}, ExecutionTime: {}", null, new Object[]{this.job.getQueueDetail().getName(), this.status, maxAllowedTime, executionTime});
        }
    }

    private void begin() {
        this.job.execute();
        this.error = null;
        this.status = this.getStatus();
    }

    private void end() {
        this.job.updateExecutionStatus(this.status, this.error);
    }

    private void callMiddlewares(int currentIndex, List<Middleware> middlewares, Job job) throws Exception {
        if (currentIndex == middlewares.size()) {
            new HandlerMiddleware(this.beanProvider.getRqueueMessageHandler()).handle(job, null);
        } else {
            middlewares.get(currentIndex).handle(job, () -> {
                this.callMiddlewares(currentIndex + 1, middlewares, job);
                return null;
            });
        }
    }

    private void processMessage() throws Exception {
        if (Objects.isNull(this.middlewareList)) {
            this.callMiddlewares(0, Collections.emptyList(), this.job);
        } else {
            this.callMiddlewares(0, this.middlewareList, this.job);
        }
        this.status = ExecutionStatus.SUCCESSFUL;
    }

    private void execute() {
        try {
            this.updateToProcessing();
            this.updateCounter(false);
            this.processMessage();
        }
        catch (MessagingException e) {
            this.updateCounter(true);
            ++this.failureCount;
            this.error = e;
            this.status = ExecutionStatus.FAILED;
        }
        catch (Throwable e) {
            this.updateCounter(true);
            ++this.failureCount;
            this.error = e;
            this.status = ExecutionStatus.FAILED;
            this.log(Level.ERROR, "Message execution failed, RqueueMessage: {}", e, this.job.getRqueueMessage());
        }
    }

    private boolean shouldRetry(long maxProcessingTime, int retryCount, int failureCount) {
        if (retryCount > 0 && ExecutionStatus.FAILED.equals((Object)this.status) && System.currentTimeMillis() < maxProcessingTime) {
            boolean doNoRetry = this.queueDetail.isDoNotRetryError(this.error);
            if (doNoRetry) {
                return false;
            }
            long backOff = this.postProcessingHandler.backOff(this.rqueueMessage, this.userMessage, failureCount, this.error);
            return backOff != -1L;
        }
        return false;
    }

    private void handleMessage() {
        long maxProcessingTime = this.getMaxProcessingTime();
        long startTime = System.currentTimeMillis();
        int retryCount = this.getRetryCount();
        int attempt = 1;
        do {
            this.log(Level.DEBUG, "Attempt {} message: {}", null, attempt, this.job.getMessage());
            this.begin();
            if (Objects.isNull((Object)this.status)) {
                this.execute();
            }
            ++attempt;
            this.end();
        } while (this.shouldRetry(maxProcessingTime, --retryCount, this.failureCount));
        this.postProcessingHandler.handle(this.job, this.status, this.failureCount, this.error);
        this.logExecutionTimeWarning(maxProcessingTime, startTime);
    }

    private long getTtlForScheduledMessageKey(RqueueMessage message) {
        long expiryInSeconds = 2L * this.job.getQueueDetail().getVisibilityTimeout() / 1000L;
        long remainingTime = (message.getProcessAt() - System.currentTimeMillis()) / 1000L;
        if (remainingTime > 0L) {
            expiryInSeconds += remainingTime;
        }
        return expiryInSeconds;
    }

    private String getScheduledMessageKey(RqueueMessage message) {
        return String.format("%s%s%s%ssch%s%d", this.job.getQueueDetail().getQueueName(), "::", this.job.getRqueueMessage().getId(), "::", "::", message.getProcessAt());
    }

    private void schedulePeriodicMessage() {
        if (this.isMessageDeleted()) {
            return;
        }
        RqueueMessage newMessage = this.job.getRqueueMessage().toBuilder().processAt(this.job.getRqueueMessage().nextProcessAt()).build();
        String messageKey = this.getScheduledMessageKey(newMessage);
        long expiryInSeconds = this.getTtlForScheduledMessageKey(newMessage);
        this.log(Level.DEBUG, "Schedule periodic message: {} Status: {}", null, this.job.getRqueueMessage(), this.beanProvider.getRqueueMessageTemplate().scheduleMessage(this.job.getQueueDetail().getScheduledQueueName(), messageKey, newMessage, expiryInSeconds));
    }

    private void handlePeriodicMessage() {
        this.schedulePeriodicMessage();
        this.handleMessage();
    }

    private void handle() {
        try {
            if (this.job.getRqueueMessage().isPeriodic()) {
                this.handlePeriodicMessage();
            } else {
                this.handleMessage();
            }
        }
        finally {
            this.queueThreadPool.release();
        }
    }

    @Override
    public void start() {
        try {
            this.init();
        }
        catch (Exception e) {
            this.log(Level.WARN, "Executor init failed Msg: {}", e, this.rqueueMessage);
            this.release(this.postProcessingHandler, this.queueThreadPool, this.queueDetail, this.rqueueMessage);
            return;
        }
        this.handle();
    }
}

