package org.nuxeo.lib.stream.pattern.consumer.internals;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import net.jodah.failsafe.Execution;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.RebalanceException;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.pattern.Message;
import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy;
import org.nuxeo.lib.stream.pattern.consumer.Consumer;
import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory;
import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
import org.nuxeo.lib.stream.pattern.consumer.ConsumerStatus;
import org.nuxeo.lib.stream.pattern.consumer.internals.BatchState;

/* loaded from: input_file:org/nuxeo/lib/stream/pattern/consumer/internals/ConsumerRunner.class */
public class ConsumerRunner<M extends Message> implements Callable<ConsumerStatus>, RebalanceListener {
    private static final Log log = LogFactory.getLog(ConsumerRunner.class);
    public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService";
    protected final ConsumerFactory<M> factory;
    protected final ConsumerPolicy policy;
    protected final LogTailer<M> tailer;
    protected String consumerId;
    protected BatchPolicy currentBatchPolicy;
    protected String threadName;
    protected Consumer<M> consumer;
    protected long acceptCounter;
    protected long committedCounter;
    protected long batchCommitCounter;
    protected long batchFailureCounter;
    protected boolean alreadySalted;
    protected Timer globalAcceptTimer;
    protected Counter globalCommittedCounter;
    protected Timer globalBatchCommitTimer;
    protected Counter globalBatchFailureCounter;
    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
    protected Counter globalConsumersCounter = this.registry.counter(MetricRegistry.name("nuxeo", new String[]{"importer", "stream", "consumers"}));

    public ConsumerRunner(ConsumerFactory<M> consumerFactory, ConsumerPolicy consumerPolicy, LogManager logManager, List<LogPartition> list) {
        this.factory = consumerFactory;
        this.currentBatchPolicy = consumerPolicy.getBatchPolicy();
        this.policy = consumerPolicy;
        this.tailer = createTailer(logManager, list);
        this.consumerId = this.tailer.toString();
        setTailerPosition(logManager);
        log.debug("Consumer thread created tailing on: " + this.consumerId);
    }

    protected LogTailer<M> createTailer(LogManager logManager, List<LogPartition> list) {
        LogTailer<M> createTailer;
        if (logManager.supportSubscribe()) {
            createTailer = logManager.subscribe(this.policy.getName(), (Set) list.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toSet()), this);
        } else {
            createTailer = logManager.createTailer(this.policy.getName(), list);
        }
        return createTailer;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public ConsumerStatus call() throws Exception {
        this.threadName = Thread.currentThread().getName();
        setMetrics();
        this.globalConsumersCounter.inc();
        long currentTimeMillis = System.currentTimeMillis();
        this.consumer = this.factory.createConsumer(this.consumerId);
        try {
            consumerLoop();
            return new ConsumerStatus(this.consumerId, this.acceptCounter, this.committedCounter, this.batchCommitCounter, this.batchFailureCounter, currentTimeMillis, System.currentTimeMillis(), false);
        } finally {
            this.consumer.close();
            this.globalConsumersCounter.dec();
            this.tailer.close();
        }
    }

    protected void setMetrics() {
        this.globalAcceptTimer = this.registry.timer(MetricRegistry.name("nuxeo", new String[]{"importer", "stream", "consumer", "accepted"}));
        this.globalCommittedCounter = this.registry.counter(MetricRegistry.name("nuxeo", new String[]{"importer", "stream", "consumer", "committed"}));
        this.globalBatchFailureCounter = this.registry.counter(MetricRegistry.name("nuxeo", new String[]{"importer", "stream", "consumer", "batchFailure"}));
        this.globalBatchCommitTimer = this.registry.timer(MetricRegistry.name("nuxeo", new String[]{"importer", "stream", "consumer", "batchCommit"}));
    }

    protected void addSalt() throws InterruptedException {
        if (this.alreadySalted) {
            return;
        }
        if (this.policy.isSalted()) {
            Thread.sleep(ThreadLocalRandom.current().nextLong(this.policy.getBatchPolicy().getTimeThreshold().toMillis()));
        }
        this.alreadySalted = true;
    }

    protected void setTailerPosition(LogManager logManager) {
        ConsumerPolicy.StartOffset startOffset = this.policy.getStartOffset();
        if (logManager.supportSubscribe() && startOffset != ConsumerPolicy.StartOffset.LAST_COMMITTED) {
            throw new UnsupportedOperationException("Tailer startOffset to " + startOffset + " is not supported in subscribe mode");
        }
        switch (this.policy.getStartOffset()) {
            case BEGIN:
                this.tailer.toStart();
                return;
            case END:
                this.tailer.toEnd();
                return;
            default:
                this.tailer.toLastCommitted();
                return;
        }
    }

    protected void consumerLoop() throws InterruptedException {
        boolean z = false;
        while (!z) {
            Execution execution = new Execution(this.policy.getRetryPolicy());
            z = processBatchWithRetry(execution);
            if (execution.getLastFailure() != null) {
                if (this.policy.continueOnFailure()) {
                    log.error("Skip message on failure after applying the retry policy: ", execution.getLastFailure());
                } else {
                    log.error("Abort on Failure after applying the retry policy: ", execution.getLastFailure());
                    z = true;
                }
            }
        }
    }

    protected boolean processBatchWithRetry(Execution execution) throws InterruptedException {
        boolean z = false;
        while (!execution.isComplete()) {
            try {
                z = processBatch();
                this.tailer.commit();
                execution.complete();
            } catch (Throwable th) {
                this.globalBatchFailureCounter.inc();
                this.batchFailureCounter++;
                if (th instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                    throw th;
                }
                if (th instanceof RebalanceException) {
                    log.info("Rebalance");
                } else {
                    if (!execution.canRetryOn(th)) {
                        throw th;
                    }
                    setBatchRetryPolicy();
                    this.tailer.toLastCommitted();
                }
            }
            restoreBatchPolicy();
        }
        return z;
    }

    protected void setBatchRetryPolicy() {
        this.currentBatchPolicy = BatchPolicy.NO_BATCH;
    }

    protected void restoreBatchPolicy() {
        this.currentBatchPolicy = this.policy.getBatchPolicy();
    }

    protected boolean processBatch() throws InterruptedException {
        boolean z = false;
        beginBatch();
        try {
            BatchState acceptBatch = acceptBatch();
            commitBatch(acceptBatch);
            if (acceptBatch.getState() == BatchState.State.LAST) {
                log.info("No more message on tailer: " + this.tailer);
                z = true;
            }
            return z;
        } catch (Exception e) {
            try {
                rollbackBatch(e);
            } catch (Exception e2) {
                log.error("Exception on rollback invocation", e2);
            }
            throw e;
        }
    }

    protected void beginBatch() {
        this.consumer.begin();
    }

    protected void commitBatch(BatchState batchState) {
        Timer.Context time = this.globalBatchCommitTimer.time();
        Throwable th = null;
        try {
            try {
                this.consumer.commit();
                this.committedCounter += batchState.getSize();
                this.globalCommittedCounter.inc(batchState.getSize());
                this.batchCommitCounter++;
                if (log.isDebugEnabled()) {
                    log.debug("Commit batch size: " + batchState.getSize() + ", total committed: " + this.committedCounter);
                }
                if (time != null) {
                    if (0 == 0) {
                        time.close();
                        return;
                    }
                    try {
                        time.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (time != null) {
                if (th != null) {
                    try {
                        time.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    time.close();
                }
            }
            throw th4;
        }
    }

    protected void rollbackBatch(Exception exc) {
        if (exc instanceof RebalanceException) {
            log.warn("Rollback current batch because of consumer rebalancing");
        } else {
            log.warn("Rollback batch", exc);
        }
        this.consumer.rollback();
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected BatchState acceptBatch() throws InterruptedException {
        BatchState batchState = new BatchState(this.currentBatchPolicy);
        batchState.start();
        do {
            LogRecord read = this.tailer.read(this.policy.getWaitMessageTimeout());
            if (read == null) {
                batchState.last();
                log.info(String.format("No record after: %ds on %s, terminating", Long.valueOf(this.policy.getWaitMessageTimeout().getSeconds()), this.consumerId));
                return batchState;
            }
            addSalt();
            Message message = (Message) read.message();
            if (message.poisonPill()) {
                log.warn("Receive a poison pill: " + message);
                batchState.last();
            } else {
                Timer.Context time = this.globalAcceptTimer.time();
                Throwable th = null;
                try {
                    try {
                        setThreadName(message.getId());
                        this.consumer.accept(message);
                        this.acceptCounter++;
                        if (time != null) {
                            if (0 != 0) {
                                try {
                                    time.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                time.close();
                            }
                        }
                        batchState.inc();
                        if (message.forceBatch()) {
                            if (log.isDebugEnabled()) {
                                log.debug("Force end of batch: " + message);
                            }
                            batchState.force();
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (time != null) {
                        if (th != null) {
                            try {
                                time.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            time.close();
                        }
                    }
                    throw th3;
                }
            }
        } while (batchState.getState() == BatchState.State.FILLING);
        return batchState;
    }

    protected void setThreadName(String str) {
        Thread.currentThread().setName(this.threadName + "-" + this.acceptCounter + "-" + str);
    }

    public void onPartitionsRevoked(Collection<LogPartition> collection) {
    }

    public void onPartitionsAssigned(Collection<LogPartition> collection) {
        this.consumerId = this.tailer.toString();
        setThreadName("rebalance-" + this.consumerId);
    }
}
