package org.nuxeo.lib.stream.computation.log;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.codec.AvroMessageCodec;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.RecordFilter;
import org.nuxeo.lib.stream.computation.RecordFilterChain;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamManager;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.internals.RecordFilterChainImpl;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;

/* loaded from: input_file:org/nuxeo/lib/stream/computation/log/LogStreamManager.class */
public class LogStreamManager implements StreamManager {
    public static final String PROCESSORS_STREAM = "internal/processors";
    public static final String METRICS_STREAM = "internal/metrics";
    protected final LogManager logManager;
    protected final Map<String, Topology> topologies = new HashMap();
    protected final Map<String, Settings> settings = new HashMap();
    protected final Map<Name, RecordFilterChain> filters = new HashMap();
    protected final Set<Name> streams = new HashSet();
    private static final Log log = LogFactory.getLog(LogStreamManager.class);
    public static final Codec<Record> INTERNAL_CODEC = new AvroMessageCodec(Record.class);

    public LogStreamManager(LogManager logManager) {
        this.logManager = logManager;
        initInternalStreams();
    }

    protected void initInternalStreams() {
        initInternalStream(Name.ofUrn(PROCESSORS_STREAM));
        initInternalStream(Name.ofUrn(METRICS_STREAM));
    }

    protected void initInternalStream(Name name) {
        this.logManager.createIfNotExists(name, 1);
        this.logManager.getAppender(name, INTERNAL_CODEC);
        this.filters.put(name, RecordFilterChainImpl.NONE);
    }

    @Override // org.nuxeo.lib.stream.computation.StreamManager
    public void register(String str, Topology topology, Settings settings) {
        log.debug("Register processor: " + str);
        this.topologies.put(str, topology);
        this.settings.put(str, settings);
        initStreams(topology, settings);
        initAppenders(topology.streamsSet(), settings);
        registerFilters(topology.streamsSet(), settings);
    }

    @Override // org.nuxeo.lib.stream.computation.StreamManager
    public void register(List<String> list, Settings settings) {
        list.forEach(str -> {
            initStream(str, settings);
        });
        initAppenders(list, settings);
        registerFilters(list, settings);
    }

    @Override // org.nuxeo.lib.stream.computation.StreamManager
    public StreamProcessor createStreamProcessor(String str) {
        if (!this.topologies.containsKey(str)) {
            throw new IllegalArgumentException("Unregistered processor name: " + str);
        }
        LogStreamProcessor logStreamProcessor = new LogStreamProcessor(this);
        logStreamProcessor.init(this.topologies.get(str), this.settings.get(str));
        HashMap hashMap = new HashMap();
        hashMap.put("processorName", str);
        hashMap.putAll(getSystemMetadata());
        append(PROCESSORS_STREAM, Record.of(hashMap.get("ip"), logStreamProcessor.toJson(hashMap).getBytes(StandardCharsets.UTF_8)));
        return logStreamProcessor;
    }

    protected Map<String, String> getSystemMetadata() {
        HashMap hashMap = new HashMap();
        try {
            InetAddress localHost = InetAddress.getLocalHost();
            hashMap.put("ip", localHost.getHostAddress());
            hashMap.put("hostname", localHost.getHostName());
        } catch (UnknownHostException e) {
            hashMap.put("ip", "unknonwn");
            hashMap.put("hostname", "unknonwn");
        }
        hashMap.put("cpuCores", String.valueOf(Runtime.getRuntime().availableProcessors()));
        hashMap.put("jvmHeapSize", String.valueOf(Runtime.getRuntime().maxMemory()));
        return hashMap;
    }

    public LogManager getLogManager() {
        return this.logManager;
    }

    @Override // org.nuxeo.lib.stream.computation.StreamManager
    public LogOffset append(String str, Record record) {
        Name ofUrn = Name.ofUrn(str);
        RecordFilterChain recordFilterChain = this.filters.get(ofUrn);
        if (recordFilterChain == null) {
            throw new IllegalArgumentException("Unknown stream: " + ofUrn);
        }
        Record beforeAppend = recordFilterChain.beforeAppend(record);
        if (beforeAppend == null) {
            return new LogOffsetImpl(ofUrn, 0, 0L);
        }
        LogOffset append = this.logManager.getAppender(ofUrn).append(beforeAppend.getKey(), (String) beforeAppend);
        recordFilterChain.afterAppend(beforeAppend, append);
        return append;
    }

    public boolean supportSubscribe() {
        return this.logManager.supportSubscribe();
    }

    public LogTailer<Record> subscribe(Name name, Collection<Name> collection, RebalanceListener rebalanceListener) {
        return this.logManager.subscribe(name, collection, rebalanceListener, getCodec(collection));
    }

    public LogTailer<Record> createTailer(Name name, Collection<LogPartition> collection) {
        if (collection.isEmpty()) {
            return this.logManager.createTailer(name, collection);
        }
        return this.logManager.createTailer(name, collection, getCodec((Collection) collection.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList())));
    }

    public RecordFilter getFilter(Name name) {
        return this.filters.get(name);
    }

    protected Codec<Record> getCodec(Collection<Name> collection) {
        Codec<Record> codec = null;
        Iterator<Name> it = collection.iterator();
        while (it.hasNext()) {
            Codec<Record> codec2 = this.logManager.getAppender(it.next()).getCodec();
            if (codec == null) {
                codec = codec2;
            } else if (!codec.getName().equals(codec2.getName())) {
                throw new IllegalArgumentException("Different codec on input streams are not supported " + collection);
            }
        }
        return codec;
    }

    protected void initStreams(Topology topology, Settings settings) {
        log.debug("Initializing streams");
        topology.streamsSet().forEach(str -> {
            initStream(str, settings);
        });
    }

    protected void initStream(String str, Settings settings) {
        Name ofUrn = Name.ofUrn(str);
        if (settings.isExternal(ofUrn)) {
            return;
        }
        if (this.logManager.exists(ofUrn)) {
            int size = this.logManager.size(ofUrn);
            if (settings.getPartitions(str) != size) {
                log.debug(String.format("Update settings for stream: %s defined with %d partitions but exists with %d partitions", str, Integer.valueOf(settings.getPartitions(str)), Integer.valueOf(size)));
                settings.setPartitions(str, size);
            }
        } else {
            this.logManager.createIfNotExists(ofUrn, settings.getPartitions(str));
        }
        this.streams.add(ofUrn);
    }

    protected void initAppenders(Collection<String> collection, Settings settings) {
        log.debug("Initializing source appenders so we ensure they use codec defined in the processor:");
        collection.forEach(str -> {
            log.debug(str);
        });
        collection.stream().filter(str2 -> {
            return !settings.isExternal(Name.ofUrn(str2));
        }).forEach(str3 -> {
            this.logManager.getAppender(Name.ofUrn(str3), settings.getCodec(str3));
        });
    }

    protected void registerFilters(Collection<String> collection, Settings settings) {
        collection.stream().filter(str -> {
            return !settings.isExternal(Name.ofUrn(str));
        }).forEach(str2 -> {
            this.filters.put(Name.ofUrn(str2), settings.getFilterChain(str2));
        });
    }
}
