package org.nuxeo.lib.stream.pattern.consumer;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.kafka.KafkaUtils;
import org.nuxeo.lib.stream.pattern.Message;
import org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool;
import org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner;

/* loaded from: input_file:org/nuxeo/lib/stream/pattern/consumer/ConsumerPool.class */
public class ConsumerPool<M extends Message> extends AbstractCallablePool<ConsumerStatus> {
    private static final Log log = LogFactory.getLog(ConsumerPool.class);
    protected final LogManager manager;
    protected final ConsumerFactory<M> factory;
    protected final ConsumerPolicy policy;
    protected final String logName;
    protected final List<List<LogPartition>> defaultAssignments;

    public ConsumerPool(String str, LogManager logManager, ConsumerFactory<M> consumerFactory, ConsumerPolicy consumerPolicy) {
        super(computeNbThreads((short) logManager.getAppender(str).size(), consumerPolicy.getMaxThreads()));
        this.logName = str;
        this.manager = logManager;
        this.factory = consumerFactory;
        this.policy = consumerPolicy;
        this.defaultAssignments = getDefaultAssignments();
        if (logManager.supportSubscribe()) {
            log.info("Creating consumer pool using Log subscribe on " + str);
        } else {
            log.info("Creating consumer pool using Log assignments on " + str + ": " + this.defaultAssignments);
        }
    }

    protected static short computeNbThreads(short s, short s2) {
        return s2 > 0 ? (short) Math.min((int) s, (int) s2) : s;
    }

    public String getConsumerGroupName() {
        return this.policy.getName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool
    public ConsumerStatus getErrorStatus() {
        return new ConsumerStatus("error", 0L, 0L, 0L, 0L, 0L, 0L, true);
    }

    @Override // org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool
    protected Callable<ConsumerStatus> getCallable(int i) {
        return new ConsumerRunner(this.factory, this.policy, this.manager, this.defaultAssignments.get(i));
    }

    @Override // org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool
    protected String getThreadPrefix() {
        return "Nuxeo-Consumer";
    }

    @Override // org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool
    protected void afterCall(List<ConsumerStatus> list) {
        Log log2 = log;
        log2.getClass();
        list.forEach((v1) -> {
            r1.info(v1);
        });
        log.warn(ConsumerStatus.toString(list));
    }

    protected List<List<LogPartition>> getDefaultAssignments() {
        return KafkaUtils.roundRobinAssignments(getNbThreads(), Collections.singletonMap(this.logName, Integer.valueOf(this.manager.getAppender(this.logName).size())));
    }
}
