package org.nuxeo.ecm.platform.importer.queue;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.joda.time.DateTime;
import org.joda.time.Minutes;
import org.joda.time.Seconds;
import org.nuxeo.common.utils.ExceptionUtils;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.PathRef;
import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
import org.nuxeo.ecm.platform.importer.queue.consumer.Consumer;
import org.nuxeo.ecm.platform.importer.queue.consumer.ConsumerFactory;
import org.nuxeo.ecm.platform.importer.queue.consumer.ImportStat;
import org.nuxeo.ecm.platform.importer.queue.manager.QueuesManager;
import org.nuxeo.ecm.platform.importer.queue.producer.Producer;
import org.nuxeo.ecm.platform.importer.source.SourceNode;

/* loaded from: input_file:org/nuxeo/ecm/platform/importer/queue/QueueImporter.class */
public class QueueImporter {
    protected ImporterLogger log;
    protected long processedNodesConsumer = 0;
    protected long unprocessedNodesConsumer = 0;
    protected long nbDocsCreated = 0;
    protected volatile boolean isRunning = false;
    protected final ImportStat importStat = new ImportStat();
    protected final List<ImporterFilter> filters = new ArrayList();
    protected final List<Thread> consumerThreads = new ArrayList();
    protected Thread producerThread;

    public QueueImporter(ImporterLogger importerLogger) {
        this.log = null;
        this.log = importerLogger;
    }

    public void importDocuments(Producer producer, QueuesManager queuesManager, String str, String str2, int i, ConsumerFactory consumerFactory) {
        this.log.info("importer: Starting import process");
        this.isRunning = true;
        DateTime dateTime = new DateTime();
        enableFilters();
        try {
            try {
                CoreSession openCoreSessionSystem = CoreInstance.openCoreSessionSystem(str2);
                Throwable th = null;
                try {
                    try {
                        producer.init(queuesManager);
                        DocumentModel document = openCoreSessionSystem.getDocument(new PathRef(str));
                        startProducerThread(producer);
                        List<Consumer> startConsumerPool = startConsumerPool(queuesManager, document, i, consumerFactory);
                        waitForProducer(producer);
                        consumersCanStop(startConsumerPool);
                        Exception waitForConsumers = waitForConsumers(startConsumerPool);
                        checkConsumerQueues(queuesManager);
                        updateStats(startConsumerPool, producer);
                        if (openCoreSessionSystem != null) {
                            if (0 != 0) {
                                try {
                                    openCoreSessionSystem.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                openCoreSessionSystem.close();
                            }
                        }
                        disableFilters(waitForConsumers);
                        this.isRunning = false;
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (openCoreSessionSystem != null) {
                        if (th != null) {
                            try {
                                openCoreSessionSystem.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            openCoreSessionSystem.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                disableFilters(null);
                this.isRunning = false;
                throw th5;
            }
        } catch (Exception e) {
            this.log.error("Error while importing", e);
            disableFilters(e);
            this.isRunning = false;
        }
        this.log.info(String.format("import: End of process: producer send %d docs, consumer receive %d docs, creating %d docs (include retries) in %s mn, rate %.2f doc/s.", Long.valueOf(producer.getNbProcessed()), Long.valueOf(this.processedNodesConsumer), Long.valueOf(this.nbDocsCreated), Integer.valueOf(Minutes.minutesBetween(dateTime, new DateTime()).getMinutes()), Float.valueOf(((float) this.processedNodesConsumer) / Seconds.secondsBetween(dateTime, r0).getSeconds())));
    }

    protected void checkConsumerQueues(QueuesManager queuesManager) {
        this.unprocessedNodesConsumer = 0L;
        for (int i = 0; i < queuesManager.getNBConsumers(); i++) {
            BlockingQueue<SourceNode> queue = queuesManager.getQueue(i);
            if (!queue.isEmpty()) {
                this.log.error("Queue of conusmer " + i + " not empty, draining " + queue.size() + " nodes to errors");
                this.unprocessedNodesConsumer += queue.size();
                do {
                    SourceNode poll = queue.poll();
                    if (poll != null) {
                        this.log.error("Unable to import " + poll.getName() + " by consumer " + i);
                    }
                } while (!queue.isEmpty());
            }
        }
    }

    private void updateStats(List<Consumer> list, Producer producer) {
        this.nbDocsCreated = 0L;
        for (Consumer consumer : list) {
            this.processedNodesConsumer += consumer.getNbProcessed();
            this.nbDocsCreated = (long) (this.nbDocsCreated + consumer.getNbDocsCreated());
        }
        if (this.unprocessedNodesConsumer > 0) {
            this.log.error("Total number of unprocessed doc because of consumers unexpected end: " + this.unprocessedNodesConsumer);
        }
        if (producer.getNbProcessed() != this.processedNodesConsumer) {
            this.log.error(String.format("Producer produced %s nodes, Consumers processed %s nodes, some nodes have been lost", Long.valueOf(producer.getNbProcessed()), Long.valueOf(this.processedNodesConsumer)));
        }
    }

    private Exception waitForConsumers(List<Consumer> list) {
        InterruptedException interruptedException = null;
        while (!consumersTerminated(list)) {
            try {
                try {
                    Thread.sleep(100L);
                } finally {
                    for (Consumer consumer : list) {
                        if (!consumer.isTerminated()) {
                            this.log.warn("Forcibly stopping consumer");
                            consumer.mustStop();
                        }
                    }
                }
            } catch (InterruptedException e) {
                this.log.error("importer: Got an InterruptedException", e);
                interruptedException = e;
                ExceptionUtils.checkInterrupt(e);
                for (Consumer consumer2 : list) {
                    if (!consumer2.isTerminated()) {
                        this.log.warn("Forcibly stopping consumer");
                        consumer2.mustStop();
                    }
                }
            }
        }
        this.log.info("importer: All consumers has terminated their work.");
        Iterator<Thread> it = this.consumerThreads.iterator();
        while (it.hasNext()) {
            try {
                it.next().join();
            } catch (InterruptedException e2) {
                this.log.error("importer: Got an InterruptedException", e2);
                ExceptionUtils.checkInterrupt(e2);
            }
        }
        this.log.info("importer: All consumers threads terminated");
        this.consumerThreads.clear();
        int i = 0;
        int i2 = 0;
        Iterator<Consumer> it2 = list.iterator();
        while (it2.hasNext()) {
            i = (int) (i + it2.next().getNbProcessed());
            i2++;
        }
        this.log.info("importer: " + i2 + " consumers terminated, processed: " + i);
        return interruptedException;
    }

    private void consumersCanStop(List<Consumer> list) {
        list.forEach((v0) -> {
            v0.canStop();
        });
    }

    protected Exception waitForProducer(Producer producer) {
        InterruptedException interruptedException = null;
        while (this.producerThread.isAlive() && !producer.isTerminated()) {
            try {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    this.log.error("importer: Got an InterruptedException", e);
                    ExceptionUtils.checkInterrupt(e);
                    interruptedException = e;
                    if (!producer.isTerminated()) {
                        this.log.warn("Forcibly stopping producer");
                        producer.mustStop();
                    }
                }
            } catch (Throwable th) {
                if (!producer.isTerminated()) {
                    this.log.warn("Forcibly stopping producer");
                    producer.mustStop();
                }
                throw th;
            }
        }
        this.log.info("importer: producer terminated its work");
        this.producerThread.join();
        this.log.info("importer: producer thread terminated");
        this.producerThread = null;
        if (!producer.isTerminated()) {
            this.log.warn("Forcibly stopping producer");
            producer.mustStop();
        }
        this.log.info("importer: producer terminated processed: " + producer.getNbProcessed());
        return interruptedException;
    }

    protected boolean consumersTerminated(List<Consumer> list) {
        Iterator<Consumer> it = list.iterator();
        while (it.hasNext()) {
            if (!it.next().isTerminated()) {
                return false;
            }
        }
        return true;
    }

    private List<Consumer> startConsumerPool(QueuesManager queuesManager, DocumentModel documentModel, int i, ConsumerFactory consumerFactory) {
        ArrayList arrayList = new ArrayList(queuesManager.getNBConsumers());
        for (int i2 = 0; i2 < queuesManager.getNBConsumers(); i2++) {
            Consumer createConsumer = consumerFactory.createConsumer(this.log, documentModel, i, queuesManager.getQueue(i2));
            arrayList.add(createConsumer);
            Thread thread = new Thread(createConsumer);
            thread.setName("import-Consumer" + i2);
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                this.log.error("Uncaught exception in " + thread.getName() + ". Consumer is going to be stopped", th);
            });
            thread.start();
            this.consumerThreads.add(thread);
        }
        return arrayList;
    }

    protected void startProducerThread(Producer producer) {
        Thread thread = new Thread(producer);
        thread.setName("import-Producer");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            this.log.error("Uncaught exception in " + thread.getName() + ". Producer is going to be stopped", th);
            producer.mustStop();
        });
        thread.start();
        this.producerThread = thread;
    }

    public ImportStat getImportStat() {
        return this.importStat;
    }

    public void addFilter(ImporterFilter importerFilter) {
        this.log.debug(String.format("Filter with %s, was added on the importer with the hash code %s.", importerFilter.toString(), Integer.valueOf(hashCode())));
        this.filters.add(importerFilter);
    }

    public long getCreatedDocsCounter() {
        return this.nbDocsCreated;
    }

    protected void enableFilters() {
        for (ImporterFilter importerFilter : this.filters) {
            this.log.debug(String.format("Running filter with %s, on the importer with the hash code %s.", importerFilter.toString(), Integer.valueOf(hashCode())));
            importerFilter.handleBeforeImport();
        }
        if (this.filters.size() == 0) {
            this.log.debug(String.format("No filters are registered on the importer with hash code %s", Integer.valueOf(hashCode())));
        }
    }

    protected void disableFilters(Exception exc) {
        Iterator<ImporterFilter> it = this.filters.iterator();
        while (it.hasNext()) {
            it.next().handleAfterImport(exc);
        }
    }

    public boolean isRunning() {
        return this.isRunning;
    }
}
