package io.confluent.kafka.multitenant;

import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import io.confluent.protobuf.cloud.events.v1.LogicalCluster;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.multitenant.MultiTenantMetadataLoader;
import org.apache.kafka.server.multitenant.MultiTenantMetadataPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/multitenant/KafkaLogicalClusterMetadataLoader.class */
public class KafkaLogicalClusterMetadataLoader implements MultiTenantMetadataLoader {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLogicalClusterMetadataLoader.class);
    private final Map<String, ?> baseClientProperties;
    private final KafkaLogicalClusterMetadataLoaderMetrics metrics;
    private final Time time;
    private final AtomicReference<State> state;
    private final Map<String, Long> lastSequenceIdByCluster;
    private final Map<String, KafkaLogicalClusterMetadata> pendingUpdates;
    private final Map<String, MultiTenantMetadataPublisher> multiTenantMetadataPublishers;
    private List<String> blockedListenerNames;
    private String topicName;
    private long topicLoadTimeoutMs;
    private KafkaBasedLog<String, byte[]> logicalClusterMetadataLog;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/KafkaLogicalClusterMetadataLoader$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
        private ConsumeCallback() {
        }

        public void onCompletion(Throwable th, ConsumerRecord<String, byte[]> consumerRecord) {
            if (th != null) {
                KafkaLogicalClusterMetadataLoader.LOG.error("Unexpected error in ConsumeCallback for KafkaLogicalClusterMetadataLoader", th);
            } else {
                KafkaLogicalClusterMetadataLoader.this.read(consumerRecord);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/KafkaLogicalClusterMetadataLoader$State.class */
    public enum State {
        NOT_RUNNING,
        STARTING,
        RUNNING,
        SHUTTING_DOWN,
        SHUTDOWN_COMPLETE,
        FAILED_TO_START;

        private boolean isNeitherStartingNorRunning() {
            return (this == STARTING || this == RUNNING) ? false : true;
        }
    }

    public KafkaLogicalClusterMetadataLoader(Map<String, ?> map, Metrics metrics, Time time) {
        this.baseClientProperties = map;
        this.metrics = new KafkaLogicalClusterMetadataLoaderMetrics(metrics);
        this.time = time;
        this.state = new AtomicReference<>(State.NOT_RUNNING);
        this.lastSequenceIdByCluster = new HashMap();
        this.pendingUpdates = new HashMap();
        this.multiTenantMetadataPublishers = new HashMap();
        this.blockedListenerNames = Collections.emptyList();
    }

    KafkaLogicalClusterMetadataLoader(Metrics metrics, Time time, List<String> list, String str, long j, KafkaBasedLog<String, byte[]> kafkaBasedLog) {
        this.baseClientProperties = null;
        this.metrics = new KafkaLogicalClusterMetadataLoaderMetrics(metrics);
        this.time = time;
        this.state = new AtomicReference<>(State.NOT_RUNNING);
        this.lastSequenceIdByCluster = new HashMap();
        this.pendingUpdates = new HashMap();
        this.multiTenantMetadataPublishers = new HashMap();
        this.blockedListenerNames = list;
        this.topicName = str;
        this.topicLoadTimeoutMs = j;
        this.logicalClusterMetadataLog = kafkaBasedLog;
    }

    public Map<Endpoint, CompletableFuture<Void>> start(Collection<Endpoint> collection, List<MultiTenantMetadataPublisher> list) {
        for (MultiTenantMetadataPublisher multiTenantMetadataPublisher : list) {
            this.multiTenantMetadataPublishers.put(multiTenantMetadataPublisher.name(), multiTenantMetadataPublisher);
        }
        if (!this.state.compareAndSet(State.NOT_RUNNING, State.STARTING)) {
            throw new IllegalStateException("Trying to start a KafkaLogicalClusterMetadataLoader which was already started!");
        }
        try {
            return endpointsToFutures(collection, CompletableFuture.runAsync(this::startLog));
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
    }

    private Map<Endpoint, CompletableFuture<Void>> endpointsToFutures(Collection<Endpoint> collection, CompletableFuture<Void> completableFuture) {
        return (Map) collection.stream().collect(Collectors.toMap(Function.identity(), endpoint -> {
            Optional listenerName = endpoint.listenerName();
            List<String> list = this.blockedListenerNames;
            Objects.requireNonNull(list);
            return ((Boolean) listenerName.map((v1) -> {
                return r1.contains(v1);
            }).orElse(false)).booleanValue() ? completableFuture : CompletableFuture.completedFuture(null);
        }));
    }

    private void startLog() {
        try {
            long nanoseconds = this.time.nanoseconds();
            this.logicalClusterMetadataLog.start();
            this.state.set(State.RUNNING);
            long millis = TimeUnit.NANOSECONDS.toMillis(this.time.nanoseconds() - nanoseconds);
            this.metrics.recordStartupLoadTime(millis);
            LOG.info("Started KafkaLogicalClusterMetadataLoader from topic {} in {}ms", this.topicName, Long.valueOf(millis));
            maybePublishMetadata(true);
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            LOG.error("Unable to start the consumer for KafkaLogicalClusterMetadataLoader", e);
            throw e;
        }
    }

    public void close() throws Exception {
        if (!this.state.compareAndSet(State.RUNNING, State.SHUTTING_DOWN)) {
            LOG.warn("KafkaLogicalClusterMetadataLoader#close() was called while the underlying KafkaBaseLog was not running, its current state: {}", this.state.get());
            return;
        }
        try {
            this.logicalClusterMetadataLog.stop();
        } finally {
            this.state.set(State.SHUTDOWN_COMPLETE);
        }
    }

    public void configure(Map<String, ?> map) {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(ConfluentConfigs.multitenantListenerNames(map, (ListenerName) null));
        arrayList.addAll(ConfluentConfigs.listenerNames("controller.listener.names", map, (ListenerName) null));
        this.blockedListenerNames = arrayList;
        this.topicName = topicName(map);
        this.topicLoadTimeoutMs = topicLoadTimeoutMs(map).longValue();
        this.logicalClusterMetadataLog = createKafkaBasedLog(map);
    }

    private String topicName(Map<String, ?> map) {
        String str = (String) map.get("confluent.cdc.lkc.metadata.topic");
        if (str == null || str.isEmpty()) {
            throw new ConfigException("Missing value of config: confluent.cdc.lkc.metadata.topic");
        }
        return str;
    }

    private Long topicLoadTimeoutMs(Map<String, ?> map) {
        Long l = (Long) map.get("confluent.cdc.api.keys.topic.load.timeout.ms");
        if (l == null || l.longValue() <= 0) {
            throw new ConfigException("Missing value of config: confluent.cdc.api.keys.topic.load.timeout.ms");
        }
        return l;
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(Map<String, ?> map) {
        Map<String, Object> consumerConfig = getConsumerConfig(map);
        consumerConfig.put("allow.auto.create.topics", false);
        consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
        consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfig.put("default.api.timeout.ms", Integer.valueOf((int) Math.min(this.topicLoadTimeoutMs, 2147483647L)));
        return new KafkaBasedLog<>(this.topicName, (Map) null, consumerConfig, () -> {
            return null;
        }, new ConsumeCallback(), Time.SYSTEM, (Consumer) null, this.topicLoadTimeoutMs);
    }

    private Map<String, Object> getConsumerConfig(Map<String, ?> map) {
        HashSet hashSet = new HashSet(ConsumerConfig.configNames());
        hashSet.remove("metric.reporters");
        HashMap hashMap = new HashMap(this.baseClientProperties);
        hashMap.keySet().retainAll(hashSet);
        hashMap.put("client.id", String.format("%s-%s", getClass().getSimpleName(), map.get("broker.id")));
        hashMap.put("bootstrap.servers", this.baseClientProperties.get("bootstrap.servers"));
        hashMap.put("enable.metrics.push", false);
        return hashMap;
    }

    void read(ConsumerRecord<String, byte[]> consumerRecord) {
        String str = (String) consumerRecord.key();
        if (str == null) {
            LOG.error("Missing key in LogicalClusterMetadata record! (partition = {}, offset = {}, timestamp = {}", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp())});
            return;
        }
        Long tryParseEventsSequenceId = AuthUtils.tryParseEventsSequenceId(consumerRecord);
        if (tryParseEventsSequenceId == null) {
            LOG.error("Unable to decode sequence id for LogicalClusterMetadata message (key = {}, partition = {}, offset = {}, timestamp = {})", new Object[]{str, Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp())});
            return;
        }
        if (consumerRecord.value() == null) {
            LOG.debug("seqId: {}. Removing LogicalClusterMetadata for {} since the LogicalClusterMetadata record has a null value", tryParseEventsSequenceId, str);
            removeLogicalCluster(str, tryParseEventsSequenceId.longValue());
            return;
        }
        try {
            KafkaLogicalClusterMetadata parseLCM = parseLCM(consumerRecord);
            if (parseLCM.isValid()) {
                if (str.equals(parseLCM.logicalClusterId())) {
                    LOG.debug("seqId: {}. Updating LogicalClusterMetadata for {}", tryParseEventsSequenceId, str);
                    updateLogicalCluster(str, tryParseEventsSequenceId.longValue(), parseLCM);
                } else {
                    LOG.error("seqId: {}. LogicalClusterMetadata id in key ({}) doesn't match what is in the record value: {}. Skipping!", new Object[]{tryParseEventsSequenceId, str, parseLCM.logicalClusterId()});
                }
            }
        } catch (IllegalArgumentException e) {
            LOG.error(String.format("seqId: %s. Unable to decode LogicalClusterMetadata message for key %s", tryParseEventsSequenceId, str), e);
        }
    }

    private void removeLogicalCluster(String str, long j) {
        if (this.state.get().isNeitherStartingNorRunning()) {
            LOG.warn("Tried to remove a LogicalClusterMetadata with a non running log (state = {})", this.state.get());
            return;
        }
        synchronized (this) {
            Long l = this.lastSequenceIdByCluster.get(str);
            if (l == null) {
                LOG.warn("Got asked to remove LogicalClusterMetadata for a cluster with no metadata: {}", str);
                return;
            }
            if (l.longValue() >= j) {
                LOG.warn("Not removing logical cluster {} because the sequenceId of the new LogicalClusterMetadata is outdated: lastSequenceId - {}, sequenceId - {}", new Object[]{str, l, Long.valueOf(j)});
                return;
            }
            this.lastSequenceIdByCluster.put(str, Long.valueOf(j));
            this.pendingUpdates.put(str, null);
            if (this.state.get() == State.RUNNING) {
                maybePublishMetadata(false);
            }
        }
    }

    private KafkaLogicalClusterMetadata parseLCM(ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            return KafkaLogicalClusterMetadata.fromProtobuf(LogicalCluster.parseFrom((byte[]) consumerRecord.value()));
        } catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException((Throwable) e);
        }
    }

    private void updateLogicalCluster(String str, long j, KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata) {
        if (this.state.get().isNeitherStartingNorRunning()) {
            LOG.warn("Tried to add or update a LogicalClusterMetadata with a non running log (state = {})", this.state.get());
            return;
        }
        synchronized (this) {
            Long l = this.lastSequenceIdByCluster.get(str);
            if (l == null || l.longValue() < j) {
                this.lastSequenceIdByCluster.put(str, Long.valueOf(j));
                this.pendingUpdates.put(str, kafkaLogicalClusterMetadata);
                if (!kafkaLogicalClusterMetadata.isActive()) {
                    markLogicalClusterAsDeactivated(str);
                }
            }
        }
        if (this.state.get() == State.RUNNING) {
            maybePublishMetadata(false);
        }
    }

    private void markLogicalClusterAsDeactivated(String str) {
        this.lastSequenceIdByCluster.put(str, Long.MAX_VALUE);
    }

    private void maybePublishMetadata(boolean z) {
        Map unmodifiableMap;
        synchronized (this) {
            unmodifiableMap = Collections.unmodifiableMap(new HashMap(this.pendingUpdates));
            this.pendingUpdates.clear();
        }
        if (z || !unmodifiableMap.isEmpty()) {
            for (MultiTenantMetadataPublisher multiTenantMetadataPublisher : this.multiTenantMetadataPublishers.values()) {
                try {
                    multiTenantMetadataPublisher.onMetadataUpdate(unmodifiableMap);
                } catch (Exception e) {
                    LOG.error("Unhandled error when publishing new LogicalClusterMetadata to {}", multiTenantMetadataPublisher.name(), e);
                }
            }
        }
    }
}
