package org.nuxeo.lib.stream.pattern.producer.internals;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.pattern.Message;
import org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner;
import org.nuxeo.lib.stream.pattern.producer.ProducerFactory;
import org.nuxeo.lib.stream.pattern.producer.ProducerIterator;
import org.nuxeo.lib.stream.pattern.producer.ProducerStatus;

/* loaded from: input_file:org/nuxeo/lib/stream/pattern/producer/internals/ProducerRunner.class */
public class ProducerRunner<M extends Message> implements Callable<ProducerStatus> {
    private static final Log log = LogFactory.getLog(ProducerRunner.class);
    protected final int producerId;
    protected final LogAppender<M> appender;
    protected final ProducerFactory<M> factory;
    protected String threadName;
    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(ConsumerRunner.NUXEO_METRICS_REGISTRY_NAME);
    protected final Timer producerTimer = this.registry.timer(MetricRegistry.name("nuxeo", new String[]{"importer", "stream", "producer"}));
    protected final Counter producersCount = this.registry.counter(MetricRegistry.name("nuxeo", new String[]{"importer", "stream", "producers"}));
    protected long counter;

    public ProducerRunner(ProducerFactory<M> producerFactory, LogAppender<M> logAppender, int i) {
        this.factory = producerFactory;
        this.producerId = i;
        this.appender = logAppender;
        log.debug("ProducerIterator thread created: " + i);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public ProducerStatus call() throws Exception {
        this.threadName = Thread.currentThread().getName();
        long currentTimeMillis = System.currentTimeMillis();
        this.producersCount.inc();
        try {
            ProducerIterator<M> createProducer = this.factory.createProducer(this.producerId);
            Throwable th = null;
            try {
                try {
                    producerLoop(createProducer);
                    if (createProducer != null) {
                        if (0 != 0) {
                            try {
                                createProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createProducer.close();
                        }
                    }
                    return new ProducerStatus(this.producerId, this.counter, currentTimeMillis, System.currentTimeMillis(), false);
                } finally {
                }
            } finally {
            }
        } finally {
            this.producersCount.dec();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void producerLoop(ProducerIterator<M> producerIterator) {
        while (producerIterator.hasNext()) {
            Timer.Context time = this.producerTimer.time();
            Throwable th = null;
            try {
                try {
                    Message message = (Message) producerIterator.next();
                    setThreadName(message);
                    this.counter++;
                    if (time != null) {
                        if (0 != 0) {
                            try {
                                time.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            time.close();
                        }
                    }
                    this.appender.append(producerIterator.getPartition(message, this.appender.size()), message);
                } finally {
                }
            } catch (Throwable th3) {
                if (time != null) {
                    if (th != null) {
                        try {
                            time.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        time.close();
                    }
                }
                throw th3;
            }
        }
    }

    protected void setThreadName(M m) {
        String str = this.threadName + "-" + this.counter;
        Thread.currentThread().setName(m != null ? str + "-" + m.getId() : str + "-null");
    }
}
