/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.listener.MessageContainerBase;
import com.github.sonus21.rqueue.listener.PostProcessingHandler;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueExecutor;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
import java.util.List;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.CollectionUtils;

abstract class RqueueMessagePoller
extends MessageContainerBase {
    final List<Middleware> middlewares;
    final long pollingInterval;
    final long backoffTime;
    private final PostProcessingHandler postProcessingHandler;
    private final RqueueBeanProvider rqueueBeanProvider;
    private final MessageHeaders messageHeaders;
    List<String> queues;

    RqueueMessagePoller(String groupName, RqueueBeanProvider rqueueBeanProvider, RqueueMessageListenerContainer.QueueStateMgr queueStateMgr, List<Middleware> middlewares, long pollingInterval, long backoffTime, PostProcessingHandler postProcessingHandler, MessageHeaders messageHeaders) {
        super(LoggerFactory.getLogger(RqueueMessagePoller.class), groupName, queueStateMgr);
        this.postProcessingHandler = postProcessingHandler;
        this.middlewares = middlewares;
        this.rqueueBeanProvider = rqueueBeanProvider;
        this.pollingInterval = pollingInterval;
        this.backoffTime = backoffTime;
        this.messageHeaders = messageHeaders;
    }

    private List<RqueueMessage> getMessages(QueueDetail queueDetail, int count) {
        return this.rqueueBeanProvider.getRqueueMessageTemplate().pop(queueDetail.getQueueName(), queueDetail.getProcessingQueueName(), queueDetail.getProcessingQueueChannelName(), queueDetail.getVisibilityTimeout(), count);
    }

    private void execute(QueueThreadPool queueThreadPool, QueueDetail queueDetail, RqueueMessage message) {
        message.setMessageHeaders(this.messageHeaders);
        try {
            queueThreadPool.execute(new RqueueExecutor(this.rqueueBeanProvider, this.queueStateMgr, this.middlewares, this.postProcessingHandler, message, queueDetail, queueThreadPool));
        }
        catch (Exception e) {
            if (e instanceof TaskRejectedException) {
                queueThreadPool.taskRejected(queueDetail, message);
            }
            this.log(Level.WARN, "Execution failed Msg: {}", e, message);
            this.release(this.postProcessingHandler, queueThreadPool, queueDetail, message);
        }
    }

    boolean shouldExit() {
        for (String queueName : this.queues) {
            if (!this.isQueueActive(queueName)) continue;
            return false;
        }
        this.log(Level.INFO, "Shutting down all queues {} are inactive", null, this.queues);
        return true;
    }

    protected boolean hasAvailableThreads(QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
        return queueThreadPool.availableThreads() > 0;
    }

    protected int getBatchSize(QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
        int batchSize = Math.min(queueDetail.getBatchSize(), queueThreadPool.availableThreads());
        batchSize = Math.max(batchSize, 1);
        this.log(Level.DEBUG, "Batch size {}", null, batchSize);
        return batchSize;
    }

    private void sendMessagesToExecutor(QueueDetail queueDetail, QueueThreadPool queueThreadPool, List<RqueueMessage> rqueueMessages) {
        for (RqueueMessage rqueueMessage : rqueueMessages) {
            this.execute(queueThreadPool, queueDetail, rqueueMessage);
        }
    }

    private void pollAndExecute(int index, String queue, QueueDetail queueDetail, QueueThreadPool queueThreadPool, int batchSize) {
        block5: {
            if (this.isQueueActive(queue)) {
                try {
                    List<RqueueMessage> messages = this.getMessages(queueDetail, batchSize);
                    this.log(Level.TRACE, "Queue: {} Fetched Msgs {}", null, queue, messages);
                    int messageCount = CollectionUtils.isEmpty(messages) ? 0 : messages.size();
                    queueThreadPool.release(batchSize - messageCount);
                    if (messageCount > 0) {
                        this.sendMessagesToExecutor(queueDetail, queueThreadPool, messages);
                        break block5;
                    }
                    this.deactivate(index, queue, DeactivateType.NO_MESSAGE);
                }
                catch (Exception e) {
                    queueThreadPool.release(batchSize);
                    this.log(Level.WARN, "Listener failed for the queue {}", e, queue);
                    this.deactivate(index, queue, DeactivateType.POLL_FAILED);
                }
            } else {
                queueThreadPool.release(batchSize);
            }
        }
    }

    void poll(int index, String queue, QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
        boolean acquired;
        this.log(Level.TRACE, "Polling queue {}", null, queue);
        int batchSize = this.getBatchSize(queueDetail, queueThreadPool);
        try {
            acquired = queueThreadPool.acquire(batchSize, this.getSemaphoreWaitTime());
        }
        catch (Exception e) {
            this.log(Level.WARN, "Exception {}", e, e.getMessage());
            this.deactivate(index, queue, DeactivateType.SEMAPHORE_EXCEPTION);
            return;
        }
        if (!acquired) {
            this.deactivate(index, queue, DeactivateType.SEMAPHORE_UNAVAILABLE);
            return;
        }
        this.pollAndExecute(index, queue, queueDetail, queueThreadPool, batchSize);
    }

    abstract long getSemaphoreWaitTime();

    abstract void deactivate(int var1, String var2, DeactivateType var3);

    static enum DeactivateType {
        POLL_FAILED,
        NO_MESSAGE,
        SEMAPHORE_EXCEPTION,
        SEMAPHORE_UNAVAILABLE;

    }
}

