package org.nuxeo.ecm.platform.importer.queue.consumer;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.nuxeo.common.utils.ExceptionUtils;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.DocumentRef;
import org.nuxeo.ecm.core.api.UnrestrictedSessionRunner;
import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
import org.nuxeo.ecm.platform.importer.queue.AbstractTaskRunner;
import org.nuxeo.ecm.platform.importer.source.SourceNode;
import org.nuxeo.runtime.metrics.MetricsService;
import org.nuxeo.runtime.transaction.TransactionHelper;

/* loaded from: input_file:org/nuxeo/ecm/platform/importer/queue/consumer/AbstractConsumer.class */
public abstract class AbstractConsumer extends AbstractTaskRunner implements Consumer {
    protected final Batch batch;
    protected final String repositoryName;
    protected final BlockingQueue<SourceNode> queue;
    protected final DocumentRef rootRef;
    protected static final long CHECK_INTERVAL = 2000;
    protected String originatingUsername;
    protected ImporterLogger log;
    protected String threadName;
    protected long startTime = 0;
    protected long lastCheckTime = 0;
    protected long lastCount = 0;
    protected double lastImediatThroughput = 0.0d;
    protected boolean replayMode = true;
    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
    protected final Timer processTimer = this.registry.timer(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumer", "import"}));
    protected final Timer commitTimer = this.registry.timer(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumer", "commit"}));
    protected final Counter retryCount = this.registry.counter(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumer", "retry"}));
    protected final Counter failCount = this.registry.counter(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumer", "fail"}));
    protected final Counter consumerCount = this.registry.counter(MetricRegistry.name("nuxeo", new String[]{"importer", "queue", "consumer"}));

    public AbstractConsumer(ImporterLogger importerLogger, DocumentModel documentModel, int i, BlockingQueue<SourceNode> blockingQueue) {
        this.log = null;
        this.log = importerLogger;
        this.repositoryName = documentModel.getRepositoryName();
        this.batch = new Batch(i);
        this.queue = blockingQueue;
        this.rootRef = documentModel.getRef();
        importerLogger.info("Create consumer root:" + documentModel.getPathAsString() + " batchSize: " + i);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.threadName = Thread.currentThread().getName();
        this.started = true;
        this.startTime = System.currentTimeMillis();
        this.lastCheckTime = this.startTime;
        this.consumerCount.inc();
        try {
            runImport();
        } catch (Exception e) {
            this.log.error("Unexpected End of consumer after " + getNbProcessed() + " nodes.", e);
            ExceptionUtils.checkInterrupt(e);
            runDrainer();
            this.error = e;
        } finally {
            this.completed = true;
            this.started = false;
            this.consumerCount.dec();
        }
    }

    protected void runDrainer() {
        markThreadName("draining");
        this.log.error("Consumer is broken, draining the queue to rejected");
        while (true) {
            try {
                SourceNode poll = this.queue.poll(1L, TimeUnit.SECONDS);
                if (poll == null && this.canStop) {
                    this.log.info("End of broken consumer, processed node: " + getNbProcessed());
                    return;
                } else if (poll != null) {
                    this.log.error("Consumer is broken reject node: " + poll.getName());
                    onSourceNodeException(poll, this.error);
                }
            } catch (InterruptedException e) {
                this.log.error("Interrupted exception received, stopping consumer");
                return;
            }
        }
    }

    private void markThreadName(String str) {
        Thread.currentThread().setName(Thread.currentThread().getName() + "-" + str);
    }

    protected void runImport() {
        UnrestrictedSessionRunner unrestrictedSessionRunner = new UnrestrictedSessionRunner(this.repositoryName, this.originatingUsername) { // from class: org.nuxeo.ecm.platform.importer.queue.consumer.AbstractConsumer.1
            public void run() {
                AbstractConsumer.this.log.info("Consumer running");
                while (true) {
                    try {
                        SourceNode poll = AbstractConsumer.this.queue.poll(1L, TimeUnit.SECONDS);
                        if (poll == null) {
                            AbstractConsumer.this.log.debug("Poll timeout, queue size:" + AbstractConsumer.this.queue.size());
                            if (AbstractConsumer.this.canStop) {
                                break;
                            }
                        } else {
                            AbstractConsumer.this.incrementProcessed();
                            AbstractConsumer.this.batch.add(poll);
                            Timer.Context time = AbstractConsumer.this.processTimer.time();
                            try {
                                try {
                                    setThreadName(poll);
                                    AbstractConsumer.this.process(this.session, poll);
                                    restoreThreadName();
                                    time.stop();
                                } catch (Exception e) {
                                    AbstractConsumer.this.log.error("Exception while consuming node: " + poll.getName(), e);
                                    ExceptionUtils.checkInterrupt(e);
                                    TransactionHelper.setTransactionRollbackOnly();
                                    time.stop();
                                }
                                commitIfNeededOrReplayBatch(poll);
                            } catch (Throwable th) {
                                time.stop();
                                throw th;
                            }
                        }
                    } catch (InterruptedException e2) {
                        AbstractConsumer.this.log.error("Interrupted exception received, stopping consumer");
                    }
                }
                AbstractConsumer.this.log.info("End of consumer, processed node: " + AbstractConsumer.this.getNbProcessed());
                commitOrReplayBatch();
            }

            private void restoreThreadName() {
                Thread.currentThread().setName(AbstractConsumer.this.threadName);
            }

            private void setThreadName(SourceNode sourceNode) {
                String str = AbstractConsumer.this.threadName + "-" + AbstractConsumer.this.nbProcessed;
                Thread.currentThread().setName(sourceNode != null ? str + "-" + sourceNode.getName() : str + "-null");
            }

            private void commitIfNeededOrReplayBatch(SourceNode sourceNode) {
                if (!TransactionHelper.isTransactionMarkedRollback()) {
                    AbstractConsumer.this.commitIfNeeded(this.session);
                } else {
                    AbstractConsumer.this.log.error("Transaction marked as rollback while processing node: " + sourceNode.getName());
                    AbstractConsumer.this.rollbackAndReplayBatch(this.session);
                }
            }

            private void commitOrReplayBatch() {
                if (TransactionHelper.isTransactionMarkedRollback()) {
                    AbstractConsumer.this.rollbackAndReplayBatch(this.session);
                } else {
                    AbstractConsumer.this.commit(this.session);
                }
            }
        };
        if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
            TransactionHelper.startTransaction();
        }
        unrestrictedSessionRunner.runUnrestricted();
    }

    protected abstract void process(CoreSession coreSession, SourceNode sourceNode) throws Exception;

    protected void commitIfNeeded(CoreSession coreSession) {
        if (this.batch.isFull()) {
            commit(coreSession);
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastCheckTime > CHECK_INTERVAL) {
                this.lastImediatThroughput = (1000.0d * ((this.nbProcessed.get() - this.lastCount) + 0.0d)) / (currentTimeMillis - this.lastCheckTime);
                this.lastCount = this.nbProcessed.get();
                this.lastCheckTime = currentTimeMillis;
            }
        }
    }

    protected void commit(CoreSession coreSession) {
        if (this.batch.size() > 0) {
            Timer.Context time = this.commitTimer.time();
            try {
                this.log.debug("Commit batch of " + this.batch.size() + " nodes");
                coreSession.save();
                TransactionHelper.commitOrRollbackTransaction();
                this.batch.clear();
                TransactionHelper.startTransaction();
            } finally {
                time.stop();
            }
        }
    }

    protected void rollbackAndReplayBatch(CoreSession coreSession) {
        this.log.info("Rollback a batch of " + this.batch.size() + " docs");
        TransactionHelper.setTransactionRollbackOnly();
        coreSession.save();
        TransactionHelper.commitOrRollbackTransaction();
        replayBatch(coreSession);
        this.batch.clear();
        TransactionHelper.startTransaction();
    }

    private void replayBatch(CoreSession coreSession) {
        if (!this.replayMode) {
            this.log.error("No replay mode, loosing the batch");
            return;
        }
        this.log.error("Replaying batch in isolated transaction");
        for (SourceNode sourceNode : this.batch.getNodes()) {
            boolean z = false;
            TransactionHelper.startTransaction();
            this.retryCount.inc();
            Timer.Context time = this.processTimer.time();
            try {
                try {
                    process(coreSession, sourceNode);
                    time.stop();
                } catch (Exception e) {
                    ExceptionUtils.checkInterrupt(e);
                    onSourceNodeException(sourceNode, e);
                    TransactionHelper.setTransactionRollbackOnly();
                    this.failCount.inc();
                    time.stop();
                }
                coreSession.save();
                if (TransactionHelper.isTransactionMarkedRollback()) {
                    onSourceNodeRollBack(sourceNode);
                } else {
                    z = true;
                }
                TransactionHelper.commitOrRollbackTransaction();
                if (z) {
                    this.log.debug("Replaying successfully node: " + sourceNode.getName());
                } else {
                    this.log.error("Import failure after replay on node: " + sourceNode.getName());
                }
            } catch (Throwable th) {
                time.stop();
                throw th;
            }
        }
    }

    protected void onSourceNodeException(SourceNode sourceNode, Exception exc) {
        this.log.error(String.format("Unable to import node [%s]", sourceNode.getName()), exc);
    }

    protected void onSourceNodeRollBack(SourceNode sourceNode) {
        this.log.error(String.format("Rollback while replaying consumer node [%s]", sourceNode.getName()));
    }

    public String getOriginatingUsername() {
        return this.originatingUsername;
    }

    public void setOriginatingUsername(String str) {
        this.originatingUsername = str;
    }
}
