/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.InternalTopicType;
import org.apache.kafka.metadata.ingester.Ingester;
import org.apache.kafka.metadata.ingester.IngesterRecord;
import org.apache.kafka.metadata.ingester.IngestionHandler;
import org.apache.kafka.metadata.ingester.KafkaConsumerIngestionWorker;
import org.slf4j.Logger;

public class ConfluentKeyValueStreamsIngesterManager
implements IngestionHandler {
    private final Logger log;
    private final Controller controller;
    private volatile Ingester ingester;
    private Map<Short, Map<Short, Long>> topicOffsets;
    private final LogContext logContext;
    private final CompletableFuture<Void> initialLoad;
    Map<String, ?> interBrokerClientConfig;
    private static String configPrefix = "confluent.metadata.";
    private final int nodeId;
    private Metrics metrics;

    public ConfluentKeyValueStreamsIngesterManager(Controller controller, Map<String, ?> interBrokerClientConfig, int nodeId, Metrics metrics) {
        this.logContext = new LogContext("[ConfluentKeyValueStreams id=" + nodeId + "]");
        this.log = this.logContext.logger(ConfluentKeyValueStreamsIngesterManager.class);
        this.controller = controller;
        this.interBrokerClientConfig = interBrokerClientConfig;
        this.nodeId = nodeId;
        this.metrics = metrics;
        this.initialLoad = new CompletableFuture();
    }

    public Map<String, Object> consumerConfigs(String topic) {
        AbstractConfig absConfig = new AbstractConfig(new ConfigDef(), this.interBrokerClientConfig);
        Map configs = ConfluentConfigs.clientConfigs((AbstractConfig)absConfig, (String)configPrefix, (ConfluentConfigs.ClientType)ConfluentConfigs.ClientType.CONSUMER, (String)topic, (String)String.valueOf(this.nodeId));
        configs.remove("interceptor.classes");
        configs.put("auto.offset.reset", "earliest");
        configs.put("enable.auto.commit", "false");
        configs.put("allow.auto.create.topics", "false");
        return configs;
    }

    public CompletableFuture<Void> startWorker() {
        if (this.ingester != null) {
            this.log.info("ingester already started");
            return this.initialLoad;
        }
        CompletableFuture<Map<Short, Map<Short, Long>>> offsetsFuture = this.controller.readKeyValueStreamsTopicOffsets();
        offsetsFuture.whenComplete((offsets, e) -> {
            this.topicOffsets = offsets;
            this.initialLoad.complete(null);
            this.log.info("starting ingester worker");
            this.createWorker(InternalTopicType.RBAC_AUTH);
        });
        return this.initialLoad;
    }

    public void shutDownWorker() {
        this.log.info("shut down worker");
        if (this.ingester != null) {
            try {
                this.ingester.close();
                this.ingester = null;
            }
            catch (InterruptedException e) {
                this.log.error("caught interrupted exception during shut down ingester worker: " + String.valueOf(e));
            }
        }
    }

    public void createWorker(InternalTopicType type) {
        KafkaConsumerIngestionWorker.Factory factory = new KafkaConsumerIngestionWorker.Factory("confluent-topic-ingester-worker", this.consumerConfigs(type.topicName()));
        this.ingester = new Ingester.Builder().setLogContext(this.logContext).setHandler(this).setWorkerFactory(factory).setMetrics(this.metrics).setInternalTopicType(type).build();
        this.ingester.start(this.epoch(), this.topicPartitionOffsets(type));
        this.log.info("starting ingester worker");
    }

    Map<TopicPartition, Optional<Long>> topicPartitionOffsets(InternalTopicType type) {
        HashMap<TopicPartition, Optional<Long>> partitionOffset = new HashMap<TopicPartition, Optional<Long>>();
        for (int i = 0; i < type.numberOfPartitions(); ++i) {
            partitionOffset.put(new TopicPartition(type.topicName(), i), Optional.empty());
        }
        if (this.topicOffsets != null && this.topicOffsets.containsKey(type.value())) {
            Map<Short, Long> offset = this.topicOffsets.get(type.value());
            for (Map.Entry<Short, Long> e : offset.entrySet()) {
                partitionOffset.put(new TopicPartition(type.topicName(), (int)e.getKey().shortValue()), Optional.of(e.getValue()));
            }
        }
        this.log.info("read topicPartitionOffsets: " + String.valueOf(partitionOffset));
        return partitionOffset;
    }

    protected void setTopicOffsets(Map<Short, Map<Short, Long>> topicOffsets) {
        this.topicOffsets = topicOffsets;
    }

    public LogContext logContext() {
        return this.logContext;
    }

    public long epoch() {
        return this.controller.curClaimEpoch();
    }

    @Override
    public CompletableFuture<Void> handle(long epoch, List<IngesterRecord> records) {
        this.log.info("handle records size:" + records.size());
        ArrayList<CompletableFuture<Void>> futureList = new ArrayList<CompletableFuture<Void>>();
        for (IngesterRecord r : records) {
            futureList.add(this.controller.addConfluentKeyValueStreamChangeRecord(OptionalLong.empty(), InternalTopicType.fromTopicName(r.topicPartition().topic()), r.key(), r.value() == null ? "" : r.value(), (short)r.topicPartition().partition(), r.offset()));
        }
        return CompletableFuture.allOf((CompletableFuture[])futureList.stream().toArray(CompletableFuture[]::new));
    }
}

