package org.nuxeo.lib.stream.computation.log;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.kafka.KafkaUtils;

/* loaded from: input_file:org/nuxeo/lib/stream/computation/log/LogStreamProcessor.class */
public class LogStreamProcessor implements StreamProcessor {
    private static final Log log = LogFactory.getLog(LogStreamProcessor.class);
    protected final LogManager manager;
    protected Topology topology;
    protected Settings settings;
    protected List<ComputationPool> pools;

    public LogStreamProcessor(LogManager logManager) {
        this.manager = logManager;
    }

    @Override // org.nuxeo.lib.stream.computation.StreamProcessor
    public StreamProcessor init(Topology topology, Settings settings) {
        log.debug("Initializing ...");
        this.topology = topology;
        this.settings = settings;
        initStreams();
        return this;
    }

    @Override // org.nuxeo.lib.stream.computation.StreamProcessor
    public void start() {
        log.debug("Starting ...");
        this.pools = initPools();
        Objects.requireNonNull(this.pools);
        this.pools.forEach((v0) -> {
            v0.start();
        });
    }

    @Override // org.nuxeo.lib.stream.computation.StreamProcessor
    public boolean waitForAssignments(Duration duration) throws InterruptedException {
        Iterator<ComputationPool> it = this.pools.iterator();
        while (it.hasNext()) {
            if (!it.next().waitForAssignments(duration)) {
                return false;
            }
        }
        return true;
    }

    @Override // org.nuxeo.lib.stream.computation.StreamProcessor
    public boolean stop(Duration duration) {
        log.debug("Stopping ...");
        if (this.pools == null) {
            return true;
        }
        long count = this.pools.parallelStream().filter(computationPool -> {
            return !computationPool.stop(duration);
        }).count();
        log.debug(String.format("Stopped %d failure", Long.valueOf(count)));
        return count == 0;
    }

    @Override // org.nuxeo.lib.stream.computation.StreamProcessor
    public boolean drainAndStop(Duration duration) {
        log.debug("Drain and stop");
        if (this.pools == null) {
            return true;
        }
        long count = this.pools.stream().filter(computationPool -> {
            return !computationPool.drainAndStop(duration);
        }).count();
        log.debug(String.format("Drained and stopped %d failure", Long.valueOf(count)));
        return count == 0;
    }

    @Override // org.nuxeo.lib.stream.computation.StreamProcessor
    public void shutdown() {
        log.debug("Shutdown ...");
        if (this.pools == null) {
            return;
        }
        this.pools.parallelStream().forEach((v0) -> {
            v0.shutdown();
        });
        log.debug("Shutdown done");
    }

    @Override // org.nuxeo.lib.stream.computation.StreamProcessor
    public long getLowWatermark() {
        HashMap hashMap = new HashMap(this.pools.size());
        Set<String> roots = this.topology.getRoots();
        HashMap hashMap2 = new HashMap(roots.size());
        this.pools.forEach(computationPool -> {
        });
        for (String str : roots) {
            Stream<String> stream = this.topology.getDescendantComputationNames(str).stream();
            hashMap.getClass();
            hashMap2.put(str, Long.valueOf(stream.mapToLong((v1) -> {
                return r3.get(v1);
            }).min().orElse(0L)));
        }
        long orElse = hashMap2.values().stream().filter(l -> {
            return l.longValue() > 1;
        }).mapToLong((v1) -> {
            return new Long(v1);
        }).min().orElse(0L);
        if (log.isTraceEnabled()) {
            log.trace("lowWatermark: " + orElse);
            hashMap2.forEach((str2, l2) -> {
                log.trace("tree " + str2 + ": " + l2);
            });
        }
        return orElse;
    }

    @Override // org.nuxeo.lib.stream.computation.StreamProcessor
    public long getLowWatermark(String str) {
        Objects.requireNonNull(str);
        HashMap hashMap = new HashMap(this.pools.size());
        this.pools.forEach(computationPool -> {
        });
        Stream<String> stream = this.topology.getAncestorComputationNames(str).stream();
        hashMap.getClass();
        return Math.min(stream.mapToLong((v1) -> {
            return r1.get(v1);
        }).min().orElse(0L), ((Long) hashMap.get(str)).longValue());
    }

    @Override // org.nuxeo.lib.stream.computation.StreamProcessor
    public boolean isDone(long j) {
        return Watermark.ofValue(getLowWatermark()).isDone(j);
    }

    protected List<ComputationPool> initPools() {
        log.debug("Initializing pools");
        return (List) this.topology.metadataList().stream().map(computationMetadataMapping -> {
            return new ComputationPool(this.topology.getSupplier(computationMetadataMapping.name()), computationMetadataMapping, getDefaultAssignments(computationMetadataMapping), this.manager);
        }).collect(Collectors.toList());
    }

    protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping computationMetadataMapping) {
        int concurrency = this.settings.getConcurrency(computationMetadataMapping.name());
        HashMap hashMap = new HashMap();
        computationMetadataMapping.inputStreams().forEach(str -> {
        });
        return KafkaUtils.roundRobinAssignments(concurrency, hashMap);
    }

    protected void initStreams() {
        log.debug("Initializing streams");
        this.topology.streamsSet().forEach(str -> {
            this.manager.createIfNotExists(str, this.settings.getPartitions(str));
        });
    }
}
