/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant;

import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.clients.CloudAdmin;
import io.confluent.kafka.multitenant.CommonTopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata;
import io.confluent.kafka.multitenant.SslCertificateManager;
import io.confluent.kafka.multitenant.TenantLifecycleManager;
import io.confluent.kafka.multitenant.quota.MultiTenantQuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import io.confluent.protobuf.cloud.events.v1.LogicalCluster;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.multitenant.LogicalClusterMetadata;
import org.apache.kafka.server.quota.ElasticCkuDefinitionUtils;

public class TopicBasedPhysicalClusterMetadata
extends CommonTopicBasedPhysicalClusterMetadata<KafkaLogicalClusterMetadata> {
    public TenantLifecycleManager tenantLifecycleManager;
    public SslCertificateManager sslCertificateManager;
    private final AtomicReference<CommonTopicBasedPhysicalClusterMetadata.State> sslCertManagerState;
    private final AtomicBoolean startedMonitoringDeactivatedClusters = new AtomicBoolean(false);
    private long updateDeactivatedTenantsIntervalMs;
    private boolean disableSslCertManager = false;
    private static final String LKC_LOAD_METRICS_GROUP_NAME = "confluent-lkc-load-metrics";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME = "lkc-metadata-end-to-end-load-time";
    private static final String LKC_METADATA_STARTUP_LOAD_TIME_SENSOR_NAME = "lkc-metadata-startup-load-time";
    private static final String DEDICATED_CKU_METRIC_NAME = "dedicated-cku";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME = "lkc-metadata-end-to-end-load-time-min";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME = "lkc-metadata-end-to-end-load-time-max";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME = "lkc-metadata-end-to-end-load-time-avg";
    private static final String LKC_METADATA_STARTUP_LOAD_TIME_METRIC_NAME = "lkc-metadata-startup-load-time-max";
    private static final KafkaMetricsGroup METRICS_GROUP = new KafkaMetricsGroup(KafkaLogicalClusterMetadata.class);
    private final Sensor lkcTimeToLoadEndToEndSensor;
    private final Sensor lkcStartupLoadSensor;
    private final KafkaMetricsGroup metricsGroupDefn;
    private List<Function<String, Boolean>> tenantRegisterCallbackFunctions;
    private List<Function<String, Boolean>> lkcMetadataUpdateCallbackFunctions;
    private List<Function<String, Boolean>> tenantDeactivatedCallbackFunctions;
    private volatile Set<String> kafkaLogicalClusterIds = new HashSet<String>();

    public TopicBasedPhysicalClusterMetadata(Metrics metrics) {
        this(metrics, Time.SYSTEM);
    }

    public TopicBasedPhysicalClusterMetadata(Metrics metrics, Time time) {
        super(metrics, time);
        this.lkcTimeToLoadEndToEndSensor = metrics.sensor(LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME);
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The minimum end to end load time of logical cluster metadata in ms"), (MeasurableStat)new Min());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The maximum end to end load time of logical cluster metadata in ms"), (MeasurableStat)new Max());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The mean end to end load time of logical cluster metadata in ms"), (MeasurableStat)new Avg());
        this.lkcStartupLoadSensor = metrics.sensor(LKC_METADATA_STARTUP_LOAD_TIME_SENSOR_NAME);
        this.lkcStartupLoadSensor.add(metrics.metricName(LKC_METADATA_STARTUP_LOAD_TIME_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The time it took for the first load of all logical cluster metadata from the topic in ms"), (MeasurableStat)new Max());
        this.sslCertManagerState = new AtomicReference<CommonTopicBasedPhysicalClusterMetadata.State>(CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY);
        this.tenantRegisterCallbackFunctions = Collections.synchronizedList(new ArrayList());
        this.lkcMetadataUpdateCallbackFunctions = Collections.synchronizedList(new ArrayList());
        this.tenantDeactivatedCallbackFunctions = Collections.synchronizedList(new ArrayList());
        this.metricsGroupDefn = new KafkaMetricsGroup(KafkaLogicalClusterMetadata.class);
    }

    void configure(CloudAdmin adminClient, String brokerId, String sslCertsPath, long deletedDelayMs, boolean deleteTenantCellMetadata, long updateDeactivatedTenantsIntervalMs, List<String> multitenantListenerNames, boolean useBcfks) throws IOException {
        LOG.warn("configure(AdminClient, ConfluentAdmin, String, String) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        this.multitenantListenerNames = multitenantListenerNames;
        this.tenantLifecycleManager = new TenantLifecycleManager(deletedDelayMs, deleteTenantCellMetadata, s -> adminClient, this.time);
        this.sslCertificateManager = new SslCertificateManager(brokerId, null, sslCertsPath, s -> adminClient, multitenantListenerNames, useBcfks);
        if (!this.disableSslCertManager) {
            this.startWatchingSslCertificates();
        }
        this.updateDeactivatedTenantsIntervalMs = updateDeactivatedTenantsIntervalMs;
    }

    void start(KafkaBasedLog<String, byte[]> kafkaBasedLog) {
        LOG.warn("start(KafkaBasedLog<>) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        if (!this.logConsumerState.compareAndSet(CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY, CommonTopicBasedPhysicalClusterMetadata.State.STARTING)) {
            throw new IllegalStateException("start() called twice from the same unit test. Shouldn't happen!");
        }
        this.lcLog = kafkaBasedLog;
        this.startLog();
    }

    public void configure(Map<String, ?> configs) {
        super.configure(configs);
        this.multitenantListenerNames = ConfluentConfigs.multitenantListenerNames(configs, null);
        Long updateIntervalMsValue = (Long)configs.get("multitenant.tenant.delete.check.ms");
        if (updateIntervalMsValue == null) {
            updateIntervalMsValue = ConfluentConfigs.MULTITENANT_TENANT_DELETE_CHECK_MS_DEFAULT;
        }
        this.updateDeactivatedTenantsIntervalMs = updateIntervalMsValue;
        this.tenantLifecycleManager = new TenantLifecycleManager(configs, this.time);
        this.linkRequestPercentageMultipler = ConfluentConfigs.linkRequestPercentageMultiplier(configs);
        String enableSslDynamicStoreUpdate = (String)configs.get("confluent.broker.listeners.ssl.file.store.reload.enable");
        boolean bl = this.disableSslCertManager = Boolean.parseBoolean(enableSslDynamicStoreUpdate) || this.disableSslCertManager;
        if (!this.disableSslCertManager) {
            this.sslCertificateManager = new SslCertificateManager(configs);
        }
        if (!this.disableSslCertManager) {
            try {
                this.startWatchingSslCertificates();
            }
            catch (IOException ioe) {
                this.close(this.getSessionUuid(configs));
                throw new ConfigException("Failed to start watching the SSL certs watcher: " + ioe.getMessage());
            }
        }
    }

    protected void reconfigureQuotas() {
        try {
            this.updateQuotas();
        }
        catch (Exception e) {
            LOG.warn("Failed to reconfigure quotas of at least one logical cluster", (Throwable)e);
        }
    }

    public String getSessionUuid(Map<String, ?> configs) {
        return AuthUtils.getBrokerSessionUuid(configs);
    }

    protected Sensor getStartSensor() {
        return this.lkcStartupLoadSensor;
    }

    protected Sensor getEndToEndSensor() {
        return this.lkcTimeToLoadEndToEndSensor;
    }

    protected String getTopicConfig() {
        return "confluent.cdc.lkc.metadata.topic";
    }

    protected String getTopicClientId(Map<String, ?> configs) {
        return String.format("%s-%s-%s", this.topicName, ConfluentConfigs.ClientType.CONSUMER, configs.get("broker.session.uuid"));
    }

    public void startClients(Function<String, Admin> adminClientSupplier) {
        this.tenantLifecycleManager.setAdminSupplierAndCreateClient(adminClientSupplier);
        if (!this.disableSslCertManager) {
            this.sslCertificateManager.setAdminSupplierAndCreateClient(adminClientSupplier);
            this.sslCertificateManager.loadSslCertFiles();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerTenantCallback(Function<String, Boolean> callback) {
        if (this.tenantRegisterCallbackFunctions.contains(callback)) {
            LOG.error("Callback function {} already registered. So, skipping.", (Object)callback.toString());
            return;
        }
        Map map = this.logicalClusterMap;
        synchronized (map) {
            this.tenantRegisterCallbackFunctions.add(callback);
            if (CommonTopicBasedPhysicalClusterMetadata.State.RUNNING.equals(this.logConsumerState.get())) {
                LOG.info("Invoking new tenant callback function {} for all existing active tenants", (Object)callback.toString());
                this.invokeTenantCallbackForActiveTenants(callback);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerTenantMetadataUpdateCallback(Function<String, Boolean> callback) {
        if (this.lkcMetadataUpdateCallbackFunctions.contains(callback)) {
            LOG.error("Callback function {} already registered in lkcMetadataUpdateCallbackFunctions. So, skipping.", (Object)callback.toString());
            return;
        }
        Map map = this.logicalClusterMap;
        synchronized (map) {
            this.lkcMetadataUpdateCallbackFunctions.add(callback);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerTenantDeactivatedCallback(Function<String, Boolean> callback) {
        if (this.tenantDeactivatedCallbackFunctions.contains(callback)) {
            LOG.error("Callback function {} already registered. So, skipping.", (Object)callback.toString());
            return;
        }
        Map map = this.logicalClusterMap;
        synchronized (map) {
            this.tenantDeactivatedCallbackFunctions.add(callback);
        }
    }

    Map<Endpoint, CompletableFuture<Void>> endpointsToFutures(Collection<Endpoint> endpoints, CompletableFuture<Void> future) {
        if (this.multitenantListenerNames.isEmpty()) {
            LOG.warn("No multi-tenant listeners are specified. This could make the broker start without the tenant metadata available");
        }
        return endpoints.stream().collect(Collectors.toMap(Function.identity(), e -> this.multitenantListenerNames.contains(e.listener()) ? future : CompletableFuture.completedFuture(null)));
    }

    public Map<Endpoint, CompletableFuture<Void>> start(Map<String, Object> interBrokerClientConfig, Collection<Endpoint> endpoints) {
        CompletableFuture<Void> finishedStarting;
        this.interBrokerClientConfig = new HashMap<String, Object>(interBrokerClientConfig);
        if (this.logConsumerState.compareAndSet(CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY, CommonTopicBasedPhysicalClusterMetadata.State.STARTING)) {
            this.lcLog = this.createKafkaBasedLog(interBrokerClientConfig);
            try {
                finishedStarting = CompletableFuture.runAsync(this::startLog);
            }
            catch (Exception e) {
                this.logConsumerState.set(CommonTopicBasedPhysicalClusterMetadata.State.FAILED_TO_START);
                throw new IllegalStateException("Unable to create a future for startLog()", e);
            }
        } else {
            this.ensureNonTerminalState((CommonTopicBasedPhysicalClusterMetadata.State)this.logConsumerState.get());
            LOG.warn("Trying to start a TopicBasedPhysicalClusterMetadata which was already started!");
            finishedStarting = CompletableFuture.completedFuture(null);
        }
        return this.endpointsToFutures(endpoints, finishedStarting);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startLog() {
        CommonTopicBasedPhysicalClusterMetadata.State currentState = (CommonTopicBasedPhysicalClusterMetadata.State)this.logConsumerState.get();
        if (currentState != CommonTopicBasedPhysicalClusterMetadata.State.STARTING) {
            throw new IllegalStateException("Trying to start a log which is in a non-starting state: " + String.valueOf(currentState));
        }
        try {
            long startNano = this.time.nanoseconds();
            this.lcLog.start();
            Map map = this.logicalClusterMap;
            synchronized (map) {
                LOG.info("Validating Elastic CKU Metadata fields in all active tenants and updating them to default, if invalid");
                this.maybeUpdateEckuMetadataForActiveTenants();
                this.postUpdateBookkeeping();
                this.registerDedicatedCKUonInit();
                LOG.info("Load flexible fanout enabled tenants to memory");
                this.updateFlexibleFanoutEnabledForActiveTenants();
                LOG.info("Invoking all tenant callback functions for all active tenants after postUpdateBookkeeping");
                this.invokeAllTenantCallbacksForActiveTenants();
                this.logConsumerState.set(CommonTopicBasedPhysicalClusterMetadata.State.RUNNING);
            }
            long loadTimeNano = this.time.nanoseconds() - startNano;
            this.getStartSensor().record((double)TimeUnit.NANOSECONDS.toMillis(loadTimeNano));
            LOG.info("Consumed initial set of {} lkcs metadata from topic {} in {} ns", new Object[]{this.logicalClusterMap.size(), this.topicName, loadTimeNano});
            this.startMonitoringDeactivatedTenants();
        }
        catch (Exception e) {
            this.logConsumerState.set(CommonTopicBasedPhysicalClusterMetadata.State.FAILED_TO_START);
            throw new IllegalStateException("Unable to start consuming lkc metadata from topic", e);
        }
    }

    private void invokeAllTenantCallbacksForActiveTenants() {
        for (Function<String, Boolean> callback : this.tenantRegisterCallbackFunctions) {
            this.invokeTenantCallbackForActiveTenants(callback);
        }
    }

    private void maybeUpdateEckuMetadataForActiveTenants() {
        Set lkcList = this.logicalClusterMap.entrySet().stream().filter(e -> ((CommonTopicBasedPhysicalClusterMetadata.LCMPair)e.getValue()).isActiveCluster()).map(Map.Entry::getKey).collect(Collectors.toSet());
        for (String lkc : lkcList) {
            this.validateEckuMetadataAndUpdateIfNeeded(lkc, true);
        }
    }

    private void updateFlexibleFanoutEnabledForActiveTenants() {
        Set lkcList = this.logicalClusterMap.entrySet().stream().filter(e -> ((CommonTopicBasedPhysicalClusterMetadata.LCMPair)e.getValue()).isActiveCluster()).map(Map.Entry::getKey).collect(Collectors.toSet());
        for (String lkc : lkcList) {
            this.updateFlexibleFanoutEnabled(lkc);
        }
    }

    private void invokeTenantCallbackForActiveTenants(Function<String, Boolean> callback) {
        Set lkcList = this.logicalClusterMap.entrySet().stream().filter(e -> ((CommonTopicBasedPhysicalClusterMetadata.LCMPair)e.getValue()).isActiveCluster()).map(Map.Entry::getKey).collect(Collectors.toSet());
        String lcm = "";
        try {
            Iterator iterator = lkcList.iterator();
            while (iterator.hasNext()) {
                String lkc;
                lcm = lkc = (String)iterator.next();
                callback.apply(lcm);
            }
        }
        catch (Exception e2) {
            LOG.error("Error invoking tenant callback {} for tenant {} on cluster start {}", new Object[]{callback, lcm, e2.getMessage()});
            throw e2;
        }
    }

    private static void emitInvalidEckuMetadata(String tenant) {
        HashMap<String, String> tenantTags = new HashMap<String, String>();
        tenantTags.put("tenant", tenant);
        METRICS_GROUP.newGauge("InvalidEckuMetadata", () -> 1, tenantTags);
    }

    private static void removeInvalidEckuMetadataIfPresent(String tenant) {
        HashMap<String, String> tenantTags = new HashMap<String, String>();
        tenantTags.put("tenant", tenant);
        METRICS_GROUP.removeMetric("InvalidEckuMetadata", tenantTags);
    }

    private static boolean isValidElasticCkuMetadata(Map<ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey, Integer> elasticCkuMetadata) {
        return elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.PRODUCE) > 0 && elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.FETCH) > 0 && elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.CONNECTION_ATTEMPT) > 0 && elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.CONNECTION_COUNT) > 0 && elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.REQUEST_COUNT) > 0 && elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.MIN_ELASTIC_CKU) > 0 && elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.MAX_ELASTIC_CKU) > 0 && elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.NON_COMPACTED_PARTITION_COUNT) > 0 && elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.COMPACTED_PARTITION_COUNT) >= 0 && elasticCkuMetadata.get(ElasticCkuDefinitionUtils.ElasticCkuDefinitionKey.TOTAL_THROUGHPUT) >= 0;
    }

    private void validateEckuMetadataAndUpdateIfNeeded(String lkc, boolean isActiveLkc) {
        Map eckuMetadata = ((KafkaLogicalClusterMetadata)((CommonTopicBasedPhysicalClusterMetadata.LCMPair)this.logicalClusterMap.get(lkc)).getLCM()).elasticCkuMetadata();
        if (isActiveLkc && eckuMetadata != null && !TopicBasedPhysicalClusterMetadata.isValidElasticCkuMetadata(eckuMetadata)) {
            LOG.error("Invalid Elastic CKU Metric Definition for {} {}. Using the default value {}.", new Object[]{lkc, eckuMetadata, ElasticCkuDefinitionUtils.DEFAULT_ELASTIC_CKU_DEFINITION});
            TenantQuotaCallback.maybeUpdateElasticCkuDefinition(lkc, ElasticCkuDefinitionUtils.DEFAULT_ELASTIC_CKU_DEFINITION);
            ((KafkaLogicalClusterMetadata)((CommonTopicBasedPhysicalClusterMetadata.LCMPair)this.logicalClusterMap.get(lkc)).getLCM()).updateElasticCkuDefinition(ElasticCkuDefinitionUtils.DEFAULT_ELASTIC_CKU_DEFINITION);
            TopicBasedPhysicalClusterMetadata.emitInvalidEckuMetadata(lkc);
        } else {
            TopicBasedPhysicalClusterMetadata.removeInvalidEckuMetadataIfPresent(lkc);
        }
    }

    private void updateFlexibleFanoutEnabled(String lkc) {
        KafkaLogicalClusterMetadata lcm = (KafkaLogicalClusterMetadata)((CommonTopicBasedPhysicalClusterMetadata.LCMPair)this.logicalClusterMap.get(lkc)).getLCM();
        if (lcm != null) {
            this.maybeUpdateFlexibleFanoutEnabled(null, lcm);
        } else {
            LOG.error("lcm is null for active tenant: {}", (Object)lkc);
        }
    }

    protected void shutdown() {
        LOG.info("Shutting down");
        if (!this.disableSslCertManager) {
            try {
                CommonTopicBasedPhysicalClusterMetadata.State prevSslCertManagerState = this.sslCertManagerState.getAndSet(CommonTopicBasedPhysicalClusterMetadata.State.CLOSED);
                if (prevSslCertManagerState.equals((Object)CommonTopicBasedPhysicalClusterMetadata.State.RUNNING) || prevSslCertManagerState.equals((Object)CommonTopicBasedPhysicalClusterMetadata.State.STARTING)) {
                    this.sslCertificateManager.shutdown();
                    this.sslCertificateManager.close();
                } else {
                    LOG.info("Trying to close already closed sslCertificateManager");
                }
            }
            catch (Exception e) {
                LOG.error("Error when shutting down sslCertificateManager", (Throwable)e);
            }
        }
        try {
            CommonTopicBasedPhysicalClusterMetadata.State prevLogState = this.logConsumerState.getAndSet(CommonTopicBasedPhysicalClusterMetadata.State.CLOSED);
            if (prevLogState.equals((Object)CommonTopicBasedPhysicalClusterMetadata.State.RUNNING) || prevLogState.equals((Object)CommonTopicBasedPhysicalClusterMetadata.State.STARTING)) {
                this.lcLog.stop();
            } else {
                LOG.info("Trying to close an lkcLog that was in a non-closable state: {}", (Object)prevLogState);
            }
        }
        catch (Exception e) {
            LOG.error("Error when shutting down lkcLog", (Throwable)e);
        }
        this.backgroundUpdatesExecutorService.shutdownNow();
        try {
            this.backgroundUpdatesExecutorService.awaitTermination(CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.debug("Shutting down was interrupted", (Throwable)e);
        }
        this.tenantLifecycleManager.close();
        LOG.info("Closed topic-based tenant cluster metadata store");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String dedicatedLogicalClusterId() {
        Map map = this.logicalClusterMap;
        synchronized (map) {
            if (this.kafkaLogicalClusterIds.size() == 1) {
                return (String)this.kafkaLogicalClusterIds.stream().findFirst().get();
            }
            return "";
        }
    }

    public Set<String> kafkaLogicalClusterIds() {
        return Collections.unmodifiableSet(this.kafkaLogicalClusterIds);
    }

    public Set<String> logicalClusterIds() {
        this.ensureOpen();
        return this.logicalClusterMap.entrySet().stream().filter(e -> ((CommonTopicBasedPhysicalClusterMetadata.LCMPair)e.getValue()).isActiveCluster()).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    private void startWatchingSslCertificates() throws IOException {
        if (this.sslCertManagerState.compareAndSet(CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY, CommonTopicBasedPhysicalClusterMetadata.State.RUNNING)) {
            try {
                this.sslCertificateManager.startWatching();
            }
            catch (IOException ioe) {
                this.sslCertManagerState.compareAndSet(CommonTopicBasedPhysicalClusterMetadata.State.RUNNING, CommonTopicBasedPhysicalClusterMetadata.State.NOT_READY);
                throw ioe;
            }
        } else {
            LOG.warn("startWatchingSslCertificates, but state is: " + this.sslCertManagerState.get().toString());
        }
    }

    protected KafkaLogicalClusterMetadata parseLCM(ConsumerRecord<String, byte[]> record) {
        KafkaLogicalClusterMetadata lcm = null;
        try {
            LogicalCluster asProto = LogicalCluster.parseFrom((byte[])((byte[])record.value()));
            lcm = KafkaLogicalClusterMetadata.fromProtobuf((LogicalCluster)asProto);
        }
        catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(e);
        }
        return lcm;
    }

    private void updateQuotas() {
        Map<String, MultiTenantQuotaConfig> tenantQuotas = this.logicalClusterMap.entrySet().stream().filter(e -> ((CommonTopicBasedPhysicalClusterMetadata.LCMPair)e.getValue()).isActiveCluster()).collect(Collectors.toMap(Map.Entry::getKey, e -> ((KafkaLogicalClusterMetadata)((CommonTopicBasedPhysicalClusterMetadata.LCMPair)e.getValue()).getLCM()).quotaConfig(this.linkRequestPercentageMultipler)));
        TenantQuotaCallback.updateQuotas(tenantQuotas, MultiTenantQuotaConfig.UNLIMITED_QUOTA);
    }

    private void maybeUpdateElasticCkuDefinition(String logicalClusterId) {
        if (this.logicalClusterMap.containsKey(logicalClusterId) && ((CommonTopicBasedPhysicalClusterMetadata.LCMPair)this.logicalClusterMap.get(logicalClusterId)).getLCM() != null) {
            Map tenantECkuDefinition = ((KafkaLogicalClusterMetadata)((CommonTopicBasedPhysicalClusterMetadata.LCMPair)this.logicalClusterMap.get(logicalClusterId)).getLCM()).elasticCkuMetadata();
            LOG.info("Updating Elastic CKU Metric Definition for lkc: {} in updateLogicalCluster method", (Object)logicalClusterId);
            TenantQuotaCallback.maybeUpdateElasticCkuDefinition(logicalClusterId, tenantECkuDefinition);
        }
    }

    protected void recordEndToEndSensor(KafkaLogicalClusterMetadata newLcm) {
        Date creationDate;
        if (newLcm.lifecycleMetadata() != null && (creationDate = newLcm.lifecycleMetadata().creationDate()) != null) {
            long endToEndLoadTime = this.time.milliseconds() - creationDate.getTime();
            this.getEndToEndSensor().record((double)endToEndLoadTime);
        }
    }

    private void updateTenant(KafkaLogicalClusterMetadata oldLcm, KafkaLogicalClusterMetadata newLcm) {
        if (CommonTopicBasedPhysicalClusterMetadata.State.RUNNING.equals(this.logConsumerState.get())) {
            if (newLcm != null) {
                LOG.info("Adding or updating lkc metadata for cluster: {}", (Object)newLcm.logicalClusterId());
                this.recordEndToEndSensor(newLcm);
                if (this.addOrUpdate((LogicalClusterMetadata)oldLcm, (LogicalClusterMetadata)newLcm)) {
                    if (newLcm.cku() > 0) {
                        this.maybeUpdateDedicatedCKUcount(oldLcm, newLcm);
                    }
                    this.maybeUpdateFlexibleFanoutEnabled(oldLcm, newLcm);
                    this.validateEckuMetadataAndUpdateIfNeeded(newLcm.logicalClusterId(), newLcm.isActive());
                    if ((oldLcm == null || !oldLcm.isActive()) && newLcm.isActive()) {
                        LOG.info("Invoking all tenant callbacks for the new tenant: {}", (Object)newLcm.logicalClusterId());
                        for (Function<String, Boolean> callback : this.tenantRegisterCallbackFunctions) {
                            callback.apply(newLcm.logicalClusterId());
                        }
                    } else if (oldLcm.isActive() && !newLcm.isActive()) {
                        LOG.info("Invoking all tenant deactivation callbacks for this tenant: {}", (Object)oldLcm.logicalClusterId());
                        for (Function<String, Boolean> callback : this.tenantDeactivatedCallbackFunctions) {
                            callback.apply(oldLcm.logicalClusterId());
                        }
                    } else if (oldLcm.isActive() && newLcm.isActive()) {
                        LOG.info("Invoking all lkc metadata update callbacks for the updated tenant: {}", (Object)newLcm.logicalClusterId());
                        for (Function<String, Boolean> callback : this.lkcMetadataUpdateCallbackFunctions) {
                            callback.apply(newLcm.logicalClusterId());
                        }
                    }
                }
            } else {
                LOG.info("Deleting lkc metadata for cluster: {}", (Object)oldLcm.logicalClusterId());
            }
            this.postUpdateBookkeeping();
        }
    }

    private void registerDedicatedCKUonInit() {
        Map<String, Integer> dedicatedTenantCkus = this.logicalClusterMap.entrySet().stream().filter(e -> ((KafkaLogicalClusterMetadata)((CommonTopicBasedPhysicalClusterMetadata.LCMPair)e.getValue()).getLCM()).cku() > 0).filter(e -> ((CommonTopicBasedPhysicalClusterMetadata.LCMPair)e.getValue()).isActiveCluster()).collect(Collectors.toMap(Map.Entry::getKey, e -> ((KafkaLogicalClusterMetadata)((CommonTopicBasedPhysicalClusterMetadata.LCMPair)e.getValue()).getLCM()).cku()));
        LOG.info("CKU config initial registration map: {}", (Object)dedicatedTenantCkus.toString());
        for (Map.Entry<String, Integer> entry : dedicatedTenantCkus.entrySet()) {
            HashMap<String, String> entityTags = new HashMap<String, String>();
            entityTags.put("tenant", entry.getKey());
            this.registerDedicatedCku(entityTags, entry.getValue());
        }
    }

    private void maybeUpdateDedicatedCKUcount(KafkaLogicalClusterMetadata oldLcm, KafkaLogicalClusterMetadata newLcm) {
        HashMap<String, String> entityTags = new HashMap<String, String>();
        entityTags.put("tenant", newLcm.logicalClusterId());
        if (oldLcm == null || !oldLcm.isActive() && newLcm.isActive()) {
            LOG.info("Registering CKU config for new dedicated cluster: {} as {}", (Object)newLcm.logicalClusterId(), (Object)newLcm.cku());
            this.registerDedicatedCku(entityTags, newLcm.cku());
        } else if (oldLcm.isActive() && !newLcm.isActive()) {
            LOG.info("Removing CKU config for inactive dedicated cluster: {}", (Object)newLcm.logicalClusterId());
            this.metricsGroupDefn.removeMetric(DEDICATED_CKU_METRIC_NAME, entityTags);
        } else if (!Objects.equals(oldLcm.cku(), newLcm.cku())) {
            LOG.info("Updating CKU config for dedicated cluster: {} from {} to {}", new Object[]{newLcm.logicalClusterId(), oldLcm.cku(), newLcm.cku()});
            this.metricsGroupDefn.removeMetric(DEDICATED_CKU_METRIC_NAME, entityTags);
            this.registerDedicatedCku(entityTags, newLcm.cku());
        }
    }

    private void registerDedicatedCku(Map<String, String> entityTags, int cku) {
        this.metricsGroupDefn.newGauge(DEDICATED_CKU_METRIC_NAME, () -> cku, entityTags);
    }

    private void maybeUpdateFlexibleFanoutEnabled(KafkaLogicalClusterMetadata oldLcm, KafkaLogicalClusterMetadata newLcm) {
        if (oldLcm == null) {
            LOG.info("Configuring flexible fanout enabled for lkc {} to {}", (Object)newLcm.logicalClusterId(), (Object)newLcm.flexFanoutEnabled());
            TenantQuotaCallback.maybeUpdateFlexibleFanoutEnabled(newLcm.logicalClusterId(), newLcm.flexFanoutEnabled());
        } else if (!Objects.equals(oldLcm.flexFanoutEnabled(), newLcm.flexFanoutEnabled())) {
            LOG.info("Updating flexible fanout enabled for lkc {} from {} to {}", new Object[]{newLcm.logicalClusterId(), oldLcm.flexFanoutEnabled(), newLcm.flexFanoutEnabled()});
            TenantQuotaCallback.maybeUpdateFlexibleFanoutEnabled(newLcm.logicalClusterId(), newLcm.flexFanoutEnabled());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void postUpdateBookkeeping() {
        Map map = this.logicalClusterMap;
        synchronized (map) {
            this.updateNumberOfTenantsMetric();
            this.updateKafkaLogicalClusterIds();
            this.updateQuotas();
        }
    }

    private void startMonitoringDeactivatedTenants() {
        if (this.updateDeactivatedTenantsIntervalMs < 1L) {
            LOG.error("The interval to check for deactivated tenants is set at {}. No tenants would be actually deleted (only deactivated!) and partitions and ACLs would leak!", (Object)this.updateDeactivatedTenantsIntervalMs);
            return;
        }
        if (!this.startedMonitoringDeactivatedClusters.getAndSet(true)) {
            this.backgroundUpdatesExecutorService.scheduleAtFixedRate(this::updateDeactivatedTenants, this.updateDeactivatedTenantsIntervalMs, this.updateDeactivatedTenantsIntervalMs, TimeUnit.MILLISECONDS);
        } else {
            LOG.info("startMonitoringDeactivatedTenants() called twice. Ignoring");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateDeactivatedTenants() {
        if (!CommonTopicBasedPhysicalClusterMetadata.State.RUNNING.equals(this.logConsumerState.get())) {
            return;
        }
        List deactivatedClusters = this.logicalClusterMap.values().stream().filter(CommonTopicBasedPhysicalClusterMetadata.LCMPair::exists).filter(p -> !p.isActiveCluster()).map(CommonTopicBasedPhysicalClusterMetadata.LCMPair::getLCM).collect(Collectors.toList());
        for (KafkaLogicalClusterMetadata lcm : deactivatedClusters) {
            this.tenantLifecycleManager.updateTenantState(lcm);
        }
        this.tenantLifecycleManager.deleteTenants();
        Set<String> fullyDeletedClusters = this.tenantLifecycleManager.fullyDeletedClusters();
        Map map = this.logicalClusterMap;
        synchronized (map) {
            for (String lkcId : fullyDeletedClusters) {
                CommonTopicBasedPhysicalClusterMetadata.LCMPair oldRecord = (CommonTopicBasedPhysicalClusterMetadata.LCMPair)this.logicalClusterMap.get(lkcId);
                if (oldRecord == null) continue;
                this.logicalClusterMap.put(lkcId, new CommonTopicBasedPhysicalClusterMetadata.LCMPair((CommonTopicBasedPhysicalClusterMetadata)this, oldRecord.getSequenceId(), null));
            }
        }
    }

    private void updateKafkaLogicalClusterIds() {
        this.kafkaLogicalClusterIds = this.logicalClusterMap.values().stream().filter(CommonTopicBasedPhysicalClusterMetadata.LCMPair::isActiveCluster).filter(e -> ((KafkaLogicalClusterMetadata)e.getLCM()).isKafkaLogicalCluster()).map(e -> ((KafkaLogicalClusterMetadata)e.getLCM()).logicalClusterId()).collect(Collectors.toSet());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void updateLogicalCluster(String clusterId, Long sequenceId, KafkaLogicalClusterMetadata lcm) {
        if (!this.isStartingOrRunningState((CommonTopicBasedPhysicalClusterMetadata.State)this.logConsumerState.get())) {
            LOG.warn("Tried to add or update a logical cluster with a non running log (state = {})", this.logConsumerState.get());
        } else {
            Map map = this.logicalClusterMap;
            synchronized (map) {
                boolean needUpdate;
                CommonTopicBasedPhysicalClusterMetadata.LCMPair prevRecord = (CommonTopicBasedPhysicalClusterMetadata.LCMPair)this.logicalClusterMap.get(clusterId);
                KafkaLogicalClusterMetadata prevLcm = null;
                if (prevRecord != null) {
                    prevLcm = (KafkaLogicalClusterMetadata)prevRecord.getLCM();
                }
                boolean bl = needUpdate = prevRecord == null || prevRecord.getSequenceId() < sequenceId && (prevRecord.isActiveCluster() || lcm.isActive());
                if (needUpdate) {
                    this.logicalClusterMap.put(clusterId, new CommonTopicBasedPhysicalClusterMetadata.LCMPair((CommonTopicBasedPhysicalClusterMetadata)this, sequenceId.longValue(), (LogicalClusterMetadata)lcm));
                    this.tenantLifecycleManager.updateTenantState(lcm);
                    if (lcm != null) {
                        this.maybeUpdateElasticCkuDefinition(lcm.logicalClusterId());
                    }
                    this.updateTenant(prevLcm, lcm);
                }
            }
        }
    }

    public boolean isUp() {
        return CommonTopicBasedPhysicalClusterMetadata.State.RUNNING.equals(this.logConsumerState.get());
    }

    private boolean addOrUpdate(LogicalClusterMetadata oldMeta, LogicalClusterMetadata newMeta) {
        boolean updateNeeded = !newMeta.equals(oldMeta);
        boolean expiredCluster = oldMeta == null && !newMeta.isActive();
        return updateNeeded && !expiredCluster;
    }
}

