/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant;

import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadataLoaderMetrics;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import io.confluent.protobuf.cloud.events.v1.LogicalCluster;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.multitenant.MultiTenantMetadataLoader;
import org.apache.kafka.server.multitenant.MultiTenantMetadataPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaLogicalClusterMetadataLoader
implements MultiTenantMetadataLoader {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLogicalClusterMetadataLoader.class);
    private final Map<String, ?> baseClientProperties;
    private final KafkaLogicalClusterMetadataLoaderMetrics metrics;
    private final Time time;
    private final AtomicReference<State> state;
    private final Map<String, Long> lastSequenceIdByCluster;
    private final Map<String, KafkaLogicalClusterMetadata> pendingUpdates;
    private final Map<String, MultiTenantMetadataPublisher> multiTenantMetadataPublishers;
    private List<String> blockedListenerNames;
    private String topicName;
    private long topicLoadTimeoutMs;
    private KafkaBasedLog<String, byte[]> logicalClusterMetadataLog;

    public KafkaLogicalClusterMetadataLoader(Map<String, ?> baseClientProperties, Metrics metrics, Time time) {
        this.baseClientProperties = baseClientProperties;
        this.metrics = new KafkaLogicalClusterMetadataLoaderMetrics(metrics);
        this.time = time;
        this.state = new AtomicReference<State>(State.NOT_RUNNING);
        this.lastSequenceIdByCluster = new HashMap<String, Long>();
        this.pendingUpdates = new HashMap<String, KafkaLogicalClusterMetadata>();
        this.multiTenantMetadataPublishers = new HashMap<String, MultiTenantMetadataPublisher>();
        this.blockedListenerNames = Collections.emptyList();
    }

    KafkaLogicalClusterMetadataLoader(Metrics metrics, Time time, List<String> blockedListenerNames, String topicName, long topicLoadTimeoutMs, KafkaBasedLog<String, byte[]> logicalClusterMetadataLog) {
        this.baseClientProperties = null;
        this.metrics = new KafkaLogicalClusterMetadataLoaderMetrics(metrics);
        this.time = time;
        this.state = new AtomicReference<State>(State.NOT_RUNNING);
        this.lastSequenceIdByCluster = new HashMap<String, Long>();
        this.pendingUpdates = new HashMap<String, KafkaLogicalClusterMetadata>();
        this.multiTenantMetadataPublishers = new HashMap<String, MultiTenantMetadataPublisher>();
        this.blockedListenerNames = blockedListenerNames;
        this.topicName = topicName;
        this.topicLoadTimeoutMs = topicLoadTimeoutMs;
        this.logicalClusterMetadataLog = logicalClusterMetadataLog;
    }

    public Map<Endpoint, CompletableFuture<Void>> start(Collection<Endpoint> endpoints, List<MultiTenantMetadataPublisher> multiTenantMetadataPublishers) {
        CompletableFuture<Void> logStartedFuture;
        for (MultiTenantMetadataPublisher multiTenantMetadataPublisher : multiTenantMetadataPublishers) {
            this.multiTenantMetadataPublishers.put(multiTenantMetadataPublisher.name(), multiTenantMetadataPublisher);
        }
        if (!this.state.compareAndSet(State.NOT_RUNNING, State.STARTING)) {
            throw new IllegalStateException("Trying to start a KafkaLogicalClusterMetadataLoader which was already started!");
        }
        try {
            logStartedFuture = CompletableFuture.runAsync(this::startLog);
        }
        catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
        return this.endpointsToFutures(endpoints, logStartedFuture);
    }

    private Map<Endpoint, CompletableFuture<Void>> endpointsToFutures(Collection<Endpoint> endpoints, CompletableFuture<Void> logStartedFuture) {
        return endpoints.stream().collect(Collectors.toMap(Function.identity(), e -> e.listenerName().map(this.blockedListenerNames::contains).orElse(false) != false ? logStartedFuture : CompletableFuture.completedFuture(null)));
    }

    private void startLog() {
        try {
            long loadStartNanos = this.time.nanoseconds();
            this.logicalClusterMetadataLog.start();
            this.state.set(State.RUNNING);
            long loadTimeMs = TimeUnit.NANOSECONDS.toMillis(this.time.nanoseconds() - loadStartNanos);
            this.metrics.recordStartupLoadTime(loadTimeMs);
            LOG.info("Started KafkaLogicalClusterMetadataLoader from topic {} in {}ms", (Object)this.topicName, (Object)loadTimeMs);
            this.maybePublishMetadata(true);
        }
        catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            LOG.error("Unable to start the consumer for KafkaLogicalClusterMetadataLoader", (Throwable)e);
            throw e;
        }
    }

    public void close() throws Exception {
        if (this.state.compareAndSet(State.RUNNING, State.SHUTTING_DOWN)) {
            try {
                this.logicalClusterMetadataLog.stop();
            }
            finally {
                this.state.set(State.SHUTDOWN_COMPLETE);
            }
        } else {
            LOG.warn("KafkaLogicalClusterMetadataLoader#close() was called while the underlying KafkaBaseLog was not running, its current state: {}", (Object)this.state.get());
        }
    }

    public void configure(Map<String, ?> configs) {
        ArrayList<String> blockedListenerNames = new ArrayList<String>();
        blockedListenerNames.addAll(ConfluentConfigs.multitenantListenerNames(configs, null));
        blockedListenerNames.addAll(ConfluentConfigs.listenerNames((String)"controller.listener.names", configs, null));
        this.blockedListenerNames = blockedListenerNames;
        this.topicName = this.topicName(configs);
        this.topicLoadTimeoutMs = this.topicLoadTimeoutMs(configs);
        this.logicalClusterMetadataLog = this.createKafkaBasedLog(configs);
    }

    private String topicName(Map<String, ?> configs) {
        String topicFromConfig = (String)configs.get("confluent.cdc.lkc.metadata.topic");
        if (topicFromConfig == null || topicFromConfig.isEmpty()) {
            throw new ConfigException("Missing value of config: confluent.cdc.lkc.metadata.topic");
        }
        return topicFromConfig;
    }

    private Long topicLoadTimeoutMs(Map<String, ?> configs) {
        Long topicLoadTimeoutMs = (Long)configs.get("confluent.cdc.api.keys.topic.load.timeout.ms");
        if (topicLoadTimeoutMs == null || topicLoadTimeoutMs <= 0L) {
            throw new ConfigException("Missing value of config: confluent.cdc.api.keys.topic.load.timeout.ms");
        }
        return topicLoadTimeoutMs;
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(Map<String, ?> configs) {
        Map<String, Object> consumerConfigs = this.getConsumerConfig(configs);
        consumerConfigs.put("allow.auto.create.topics", false);
        consumerConfigs.put("key.deserializer", StringDeserializer.class.getName());
        consumerConfigs.put("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfigs.put("default.api.timeout.ms", (int)Math.min(this.topicLoadTimeoutMs, Integer.MAX_VALUE));
        return new KafkaBasedLog(this.topicName, null, consumerConfigs, () -> null, (Callback)new ConsumeCallback(), Time.SYSTEM, null, this.topicLoadTimeoutMs);
    }

    private Map<String, Object> getConsumerConfig(Map<String, ?> configs) {
        HashSet configNames = new HashSet(ConsumerConfig.configNames());
        configNames.remove("metric.reporters");
        HashMap<String, Object> clientConfigs = new HashMap<String, Object>(this.baseClientProperties);
        clientConfigs.keySet().retainAll(configNames);
        clientConfigs.put("client.id", String.format("%s-%s", this.getClass().getSimpleName(), configs.get("broker.id")));
        clientConfigs.put("bootstrap.servers", this.baseClientProperties.get("bootstrap.servers"));
        clientConfigs.put("enable.metrics.push", false);
        return clientConfigs;
    }

    void read(ConsumerRecord<String, byte[]> record) {
        String logicalClusterId = (String)record.key();
        if (logicalClusterId == null) {
            LOG.error("Missing key in LogicalClusterMetadata record! (partition = {}, offset = {}, timestamp = {}", new Object[]{record.partition(), record.offset(), record.timestamp()});
            return;
        }
        Long sequenceId = AuthUtils.tryParseEventsSequenceId(record);
        if (sequenceId == null) {
            LOG.error("Unable to decode sequence id for LogicalClusterMetadata message (key = {}, partition = {}, offset = {}, timestamp = {})", new Object[]{logicalClusterId, record.partition(), record.offset(), record.timestamp()});
            return;
        }
        if (record.value() == null) {
            LOG.debug("seqId: {}. Removing LogicalClusterMetadata for {} since the LogicalClusterMetadata record has a null value", (Object)sequenceId, (Object)logicalClusterId);
            this.removeLogicalCluster(logicalClusterId, sequenceId);
            return;
        }
        try {
            KafkaLogicalClusterMetadata lcm = this.parseLCM(record);
            if (lcm.isValid()) {
                if (!logicalClusterId.equals(lcm.logicalClusterId())) {
                    LOG.error("seqId: {}. LogicalClusterMetadata id in key ({}) doesn't match what is in the record value: {}. Skipping!", new Object[]{sequenceId, logicalClusterId, lcm.logicalClusterId()});
                } else {
                    LOG.debug("seqId: {}. Updating LogicalClusterMetadata for {}", (Object)sequenceId, (Object)logicalClusterId);
                    this.updateLogicalCluster(logicalClusterId, sequenceId, lcm);
                }
            }
        }
        catch (IllegalArgumentException iae) {
            LOG.error(String.format("seqId: %s. Unable to decode LogicalClusterMetadata message for key %s", sequenceId, logicalClusterId), (Throwable)iae);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeLogicalCluster(String logicalClusterId, long sequenceId) {
        if (this.state.get().isNeitherStartingNorRunning()) {
            LOG.warn("Tried to remove a LogicalClusterMetadata with a non running log (state = {})", (Object)this.state.get());
            return;
        }
        KafkaLogicalClusterMetadataLoader kafkaLogicalClusterMetadataLoader = this;
        synchronized (kafkaLogicalClusterMetadataLoader) {
            Long lastSequenceId = this.lastSequenceIdByCluster.get(logicalClusterId);
            if (lastSequenceId == null) {
                LOG.warn("Got asked to remove LogicalClusterMetadata for a cluster with no metadata: {}", (Object)logicalClusterId);
                return;
            }
            if (lastSequenceId >= sequenceId) {
                LOG.warn("Not removing logical cluster {} because the sequenceId of the new LogicalClusterMetadata is outdated: lastSequenceId - {}, sequenceId - {}", new Object[]{logicalClusterId, lastSequenceId, sequenceId});
                return;
            }
            this.lastSequenceIdByCluster.put(logicalClusterId, sequenceId);
            this.pendingUpdates.put(logicalClusterId, null);
        }
        if (this.state.get() == State.RUNNING) {
            this.maybePublishMetadata(false);
        }
    }

    private KafkaLogicalClusterMetadata parseLCM(ConsumerRecord<String, byte[]> record) {
        KafkaLogicalClusterMetadata lcm;
        try {
            LogicalCluster asProto = LogicalCluster.parseFrom((byte[])((byte[])record.value()));
            lcm = KafkaLogicalClusterMetadata.fromProtobuf((LogicalCluster)asProto);
        }
        catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(e);
        }
        return lcm;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateLogicalCluster(String logicalClusterId, long sequenceId, KafkaLogicalClusterMetadata lcm) {
        if (this.state.get().isNeitherStartingNorRunning()) {
            LOG.warn("Tried to add or update a LogicalClusterMetadata with a non running log (state = {})", (Object)this.state.get());
            return;
        }
        KafkaLogicalClusterMetadataLoader kafkaLogicalClusterMetadataLoader = this;
        synchronized (kafkaLogicalClusterMetadataLoader) {
            Long lastSequenceId = this.lastSequenceIdByCluster.get(logicalClusterId);
            if (lastSequenceId == null || lastSequenceId < sequenceId) {
                this.lastSequenceIdByCluster.put(logicalClusterId, sequenceId);
                this.pendingUpdates.put(logicalClusterId, lcm);
                if (!lcm.isActive()) {
                    this.markLogicalClusterAsDeactivated(logicalClusterId);
                }
            }
        }
        if (this.state.get() == State.RUNNING) {
            this.maybePublishMetadata(false);
        }
    }

    private void markLogicalClusterAsDeactivated(String logicalClusterId) {
        this.lastSequenceIdByCluster.put(logicalClusterId, Long.MAX_VALUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void maybePublishMetadata(boolean forceUpdate) {
        Map<String, KafkaLogicalClusterMetadata> updatedLogicalClusterMetadata;
        KafkaLogicalClusterMetadataLoader kafkaLogicalClusterMetadataLoader = this;
        synchronized (kafkaLogicalClusterMetadataLoader) {
            updatedLogicalClusterMetadata = Collections.unmodifiableMap(new HashMap<String, KafkaLogicalClusterMetadata>(this.pendingUpdates));
            this.pendingUpdates.clear();
        }
        if (!forceUpdate && updatedLogicalClusterMetadata.isEmpty()) {
            return;
        }
        for (MultiTenantMetadataPublisher multiTenantMetadataPublisher : this.multiTenantMetadataPublishers.values()) {
            try {
                multiTenantMetadataPublisher.onMetadataUpdate(updatedLogicalClusterMetadata);
            }
            catch (Exception e) {
                LOG.error("Unhandled error when publishing new LogicalClusterMetadata to {}", (Object)multiTenantMetadataPublisher.name(), (Object)e);
            }
        }
    }

    private static enum State {
        NOT_RUNNING,
        STARTING,
        RUNNING,
        SHUTTING_DOWN,
        SHUTDOWN_COMPLETE,
        FAILED_TO_START;


        private boolean isNeitherStartingNorRunning() {
            return this != STARTING && this != RUNNING;
        }
    }

    private class ConsumeCallback
    implements Callback<ConsumerRecord<String, byte[]>> {
        private ConsumeCallback() {
        }

        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
            if (error != null) {
                LOG.error("Unexpected error in ConsumeCallback for KafkaLogicalClusterMetadataLoader", error);
            } else {
                KafkaLogicalClusterMetadataLoader.this.read(record);
            }
        }
    }
}

