package org.nuxeo.ecm.platform.importer.mqueues.consumer;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import net.jodah.failsafe.Execution;
import net.openhft.chronicle.core.util.Time;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.platform.importer.mqueues.consumer.BatchState;
import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues;

/* loaded from: input_file:org/nuxeo/ecm/platform/importer/mqueues/consumer/ConsumerRunner.class */
public class ConsumerRunner<M extends Message> implements Callable<ConsumerStatus> {
    private static final Log log = LogFactory.getLog(ConsumerRunner.class);
    public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService";
    private final ConsumerFactory<M> factory;
    private final ConsumerPolicy policy;
    private final int queue;
    private final MQueues.Tailer<M> tailer;
    private BatchPolicy currentBatchPolicy;
    private String threadName;
    private Consumer<M> consumer;
    protected final Timer acceptTimer;
    protected final Counter committedCounter;
    protected final Timer batchCommitTimer;
    protected final Counter batchFailureCount;
    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
    protected final Counter consumersCount = newCounter(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumers"}));

    public ConsumerRunner(ConsumerFactory<M> consumerFactory, ConsumerPolicy consumerPolicy, MQueues.Tailer<M> tailer) {
        this.factory = consumerFactory;
        this.tailer = tailer;
        this.currentBatchPolicy = consumerPolicy.getBatchPolicy();
        this.policy = consumerPolicy;
        this.queue = tailer.getQueue();
        this.acceptTimer = newTimer(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumer", "accepted", String.valueOf(this.queue)}));
        this.committedCounter = newCounter(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumer", "committed", String.valueOf(this.queue)}));
        this.batchFailureCount = newCounter(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumer", "batchFailure", String.valueOf(this.queue)}));
        this.batchCommitTimer = newTimer(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumer", "batchCommit", String.valueOf(this.queue)}));
        log.debug("Consumer thread created tailing on queue: " + this.queue);
    }

    private Counter newCounter(String str) {
        this.registry.remove(str);
        return this.registry.counter(str);
    }

    private Timer newTimer(String str) {
        this.registry.remove(str);
        return this.registry.timer(str);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public ConsumerStatus call() throws Exception {
        this.threadName = Thread.currentThread().getName();
        this.consumersCount.inc();
        long currentTimeMillis = Time.currentTimeMillis();
        setTailerPosition();
        this.consumer = this.factory.createConsumer(this.queue);
        try {
            addSalt();
            consumerLoop();
            return new ConsumerStatus(this.queue, this.acceptTimer.getCount(), this.committedCounter.getCount(), this.batchCommitTimer.getCount(), this.batchFailureCount.getCount(), currentTimeMillis, Time.currentTimeMillis(), false);
        } finally {
            this.consumer.close();
            this.consumersCount.dec();
        }
    }

    private void addSalt() throws InterruptedException {
        long nextLong = ThreadLocalRandom.current().nextLong(this.policy.getBatchPolicy().getTimeThreshold().toMillis());
        if (this.policy.isSalted()) {
            Thread.sleep(nextLong);
        }
    }

    private void setTailerPosition() {
        switch (this.policy.getStartOffset()) {
            case BEGIN:
                this.tailer.toStart();
                return;
            case END:
                this.tailer.toEnd();
                return;
            default:
                this.tailer.toLastCommitted();
                return;
        }
    }

    private void consumerLoop() throws InterruptedException {
        boolean z = false;
        while (!z) {
            Execution execution = new Execution(this.policy.getRetryPolicy());
            z = processBatchWithRetry(execution);
            if (execution.getLastFailure() != null) {
                if (this.policy.continueOnFailure()) {
                    log.error("Skip message on failure after applying the retry policy: ", execution.getLastFailure());
                } else {
                    log.error("Abort on Failure after applying the retry policy: ", execution.getLastFailure());
                    z = true;
                }
            }
        }
    }

    private boolean processBatchWithRetry(Execution execution) throws InterruptedException {
        boolean canRetryOn;
        boolean z = false;
        while (!execution.isComplete()) {
            try {
                z = processBatch();
                execution.complete();
                this.tailer.commit();
            } finally {
                if (canRetryOn) {
                    restoreBatchPolicy();
                }
            }
            restoreBatchPolicy();
        }
        return z;
    }

    private void setBatchRetryPolicy() {
        this.currentBatchPolicy = BatchPolicy.NO_BATCH;
    }

    private void restoreBatchPolicy() {
        this.currentBatchPolicy = this.policy.getBatchPolicy();
    }

    private boolean processBatch() throws InterruptedException {
        boolean z = false;
        beginBatch();
        try {
            BatchState acceptBatch = acceptBatch();
            commitBatch(acceptBatch);
            if (acceptBatch.getState() == BatchState.State.LAST) {
                log.info(String.format("No more message on queue %02d", Integer.valueOf(this.queue)));
                z = true;
            }
            return z;
        } catch (Exception e) {
            try {
                rollbackBatch();
            } catch (Exception e2) {
                log.error("Exception on rollback invocation", e2);
            }
            throw e;
        }
    }

    private void beginBatch() {
        this.consumer.begin();
    }

    private void commitBatch(BatchState batchState) {
        Timer.Context time = this.batchCommitTimer.time();
        Throwable th = null;
        try {
            try {
                this.consumer.commit();
                this.committedCounter.inc(batchState.getSize());
                if (time != null) {
                    if (0 == 0) {
                        time.close();
                        return;
                    }
                    try {
                        time.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (time != null) {
                if (th != null) {
                    try {
                        time.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    time.close();
                }
            }
            throw th4;
        }
    }

    private void rollbackBatch() {
        log.warn("Rollback batch");
        this.consumer.rollback();
    }

    private BatchState acceptBatch() throws InterruptedException {
        BatchState batchState = new BatchState(this.currentBatchPolicy);
        batchState.start();
        do {
            M read = this.tailer.read(this.policy.getWaitMessageTimeout());
            if (read == null) {
                batchState.last();
                return batchState;
            }
            if (read.poisonPill()) {
                log.warn("Receivce a poison pill: " + read);
                batchState.last();
            } else {
                Timer.Context time = this.acceptTimer.time();
                Throwable th = null;
                try {
                    try {
                        setThreadName(read);
                        this.consumer.accept(read);
                        if (time != null) {
                            if (0 != 0) {
                                try {
                                    time.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                time.close();
                            }
                        }
                        batchState.inc();
                        if (read.forceBatch()) {
                            if (log.isDebugEnabled()) {
                                log.debug("Force end of batch: " + read);
                            }
                            batchState.force();
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (time != null) {
                        if (th != null) {
                            try {
                                time.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            time.close();
                        }
                    }
                    throw th3;
                }
            }
        } while (batchState.getState() == BatchState.State.FILLING);
        return batchState;
    }

    private void setThreadName(M m) {
        String str = this.threadName + "-" + this.acceptTimer.getCount();
        Thread.currentThread().setName(m != null ? str + "-" + m.getId() : str + "-null");
    }
}
