package org.nuxeo.ecm.platform.importer.mqueues.consumer;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues;

/* loaded from: input_file:org/nuxeo/ecm/platform/importer/mqueues/consumer/ConsumerPool.class */
public class ConsumerPool<M extends Message> extends AbstractCallablePool<ConsumerStatus> {
    private static final Log log = LogFactory.getLog(ConsumerPool.class);
    private final MQueues<M> qm;
    private final ConsumerFactory<M> factory;
    private final ConsumerPolicy policy;
    private final List<MQueues.Tailer<M>> tailers;

    public ConsumerPool(MQueues<M> mQueues, ConsumerFactory<M> consumerFactory, ConsumerPolicy consumerPolicy) {
        super(mQueues.size());
        this.qm = mQueues;
        this.factory = consumerFactory;
        this.policy = consumerPolicy;
        this.tailers = new ArrayList(mQueues.size());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.nuxeo.ecm.platform.importer.mqueues.consumer.AbstractCallablePool
    public ConsumerStatus getErrorStatus() {
        return new ConsumerStatus(0, 0L, 0L, 0L, 0L, 0L, 0L, true);
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.consumer.AbstractCallablePool
    protected Callable<ConsumerStatus> getCallable(int i) {
        MQueues.Tailer<M> createTailer = this.qm.createTailer(i);
        this.tailers.add(createTailer);
        return new ConsumerRunner(this.factory, this.policy, createTailer);
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.consumer.AbstractCallablePool
    protected String getThreadPrefix() {
        return "Nuxeo-Consumer";
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.consumer.AbstractCallablePool
    protected void afterCall(List<ConsumerStatus> list) {
        closeTailers();
        Log log2 = log;
        log2.getClass();
        list.forEach((v1) -> {
            r1.info(v1);
        });
        log.warn(ConsumerStatus.toString(list));
    }

    private void closeTailers() {
        this.tailers.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(tailer -> {
            try {
                tailer.close();
            } catch (Exception e) {
                log.error("Unable to close tailer: " + tailer.getQueue());
            }
        });
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.consumer.AbstractCallablePool, java.lang.AutoCloseable
    public void close() throws Exception {
        super.close();
        closeTailers();
    }
}
