package io.confluent.kafka.server.plugins.auth;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.SecretsLogFailedException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.multitenant.MultiTenantSecretsStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/server/plugins/auth/BaseMultiTenantSaslSecretsStore.class */
public abstract class BaseMultiTenantSaslSecretsStore implements MultiTenantSecretsStore {
    protected static final Map<String, BaseMultiTenantSaslSecretsStore> INSTANCES = new HashMap();
    protected static final Logger LOG = LoggerFactory.getLogger(BaseMultiTenantSaslSecretsStore.class);
    public static final String METRICS_GROUP = "tenant-metrics";
    protected List<String> multitenantListenerNames = Collections.emptyList();
    private final ObjectMapper objectMapper;
    protected String sessionUuid;
    protected String topicName;
    protected Long topicLoadTimeoutMs;
    protected final Map<String, ?> baseClientProperties;
    protected final Metrics metrics;
    protected KafkaBasedLog<String, String> secretsLog;
    private final ConcurrentHashMap<String, MultiTenantSaslConfigEntry> secretsMap;
    private final MultiTenantSaslSecrets secrets;
    private final AtomicReference<State> state;
    private final MetricName apiKeyCountMetricName;
    private final Sensor apiKeyCreationSensor;
    private final Sensor apiKeyDeletionSensor;
    private final Sensor topicLoadTimeSensor;
    private final Map<String, Long> lastSequenceId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/server/plugins/auth/BaseMultiTenantSaslSecretsStore$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<String, String>> {
        private ConsumeCallback() {
        }

        public void onCompletion(Throwable th, ConsumerRecord<String, String> consumerRecord) {
            if (th != null) {
                BaseMultiTenantSaslSecretsStore.LOG.error("Unexpected error in consumer callback for MultiTenantSaslSecretsStore: ", th);
            } else {
                BaseMultiTenantSaslSecretsStore.this.read(consumerRecord);
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafka/server/plugins/auth/BaseMultiTenantSaslSecretsStore$State.class */
    public enum State {
        NOT_RUNNING((byte) 0),
        STARTING((byte) 1),
        RUNNING((byte) 2),
        SHUTTING_DOWN((byte) 3),
        SHUTDOWN_COMPLETE((byte) 4),
        FAILED_TO_START((byte) 5);

        private final byte value;

        State(byte b) {
            this.value = b;
        }

        public byte value() {
            return this.value;
        }
    }

    public BaseMultiTenantSaslSecretsStore(Map<String, Object> map, Metrics metrics) {
        LOG.trace("Creating MultiTenantSaslSecretsStore");
        this.objectMapper = new ObjectMapper();
        this.baseClientProperties = map;
        this.metrics = metrics;
        this.secretsMap = new ConcurrentHashMap<>();
        this.secrets = new MultiTenantSaslSecrets(this.secretsMap);
        this.state = new AtomicReference<>(State.NOT_RUNNING);
        this.lastSequenceId = new HashMap();
        this.apiKeyCountMetricName = metrics.metricName("active-api-key-count", METRICS_GROUP, "The number of active API keys.");
        metrics.addMetric(this.apiKeyCountMetricName, (metricConfig, j) -> {
            return this.secrets.entries().size();
        });
        this.apiKeyCreationSensor = metrics.sensor("ApiKeyCreation");
        this.apiKeyCreationSensor.add(metrics.metricName("api-key-creation-rate", METRICS_GROUP, "The rate of new API key creation."), new Rate());
        this.apiKeyDeletionSensor = metrics.sensor("ApiKeyDeletion");
        this.apiKeyDeletionSensor.add(metrics.metricName("api-key-deletion-rate", METRICS_GROUP, "The rate of API key deletion."), new Rate());
        this.topicLoadTimeSensor = metrics.sensor("ApiKeyTopicLoadTime");
        this.topicLoadTimeSensor.add(metrics.metricName("api-key-topic-load-time", METRICS_GROUP, "The loading time for the api key topic."), new Max());
    }

    public void configure(Map<String, ?> map) {
        this.sessionUuid = getSessionUuid(map);
        this.multitenantListenerNames = ConfluentConfigs.multitenantListenerNames(map, (ListenerName) null);
        this.topicName = getTopicName(map);
        this.topicLoadTimeoutMs = getTopicLoadTimeout(map);
        this.secretsLog = createKafkaBasedLog(map);
        String simpleName = getClass().getSimpleName();
        synchronized (INSTANCES) {
            BaseMultiTenantSaslSecretsStore baseMultiTenantSaslSecretsStore = INSTANCES.get(this.sessionUuid);
            if (baseMultiTenantSaslSecretsStore == null) {
                INSTANCES.put(this.sessionUuid, this);
                LOG.info("Configured {} instance (broker session {})", simpleName, this.sessionUuid);
            } else {
                if (this != baseMultiTenantSaslSecretsStore) {
                    throw new UnsupportedOperationException(simpleName + " instance already exists for broker session " + this.sessionUuid);
                }
                LOG.info("Skipping configuring this instance (session {}): Already configured.", this.sessionUuid);
            }
        }
    }

    protected abstract String getSessionUuid(Map<String, ?> map);

    private KafkaBasedLog<String, String> createKafkaBasedLog(Map<String, ?> map) {
        Map<String, Object> consumerConfig = getConsumerConfig(map);
        consumerConfig.put("allow.auto.create.topics", false);
        consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
        consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
        consumerConfig.put("default.api.timeout.ms", Integer.valueOf((int) Math.min(this.topicLoadTimeoutMs.longValue(), 2147483647L)));
        return new KafkaBasedLog<>(this.topicName, (Map) null, consumerConfig, new ConsumeCallback(), Time.SYSTEM, (Runnable) null, this.topicLoadTimeoutMs.longValue());
    }

    public Map<Endpoint, CompletableFuture<Void>> start(Collection<Endpoint> collection) {
        if (!this.state.compareAndSet(State.NOT_RUNNING, State.STARTING)) {
            throw new IllegalStateException("Trying to start a MultiTenantSaslSecretsStore which was already started!");
        }
        try {
            CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
                startLog();
            });
            return (Map) collection.stream().collect(Collectors.toMap(Function.identity(), endpoint -> {
                Optional listenerName = endpoint.listenerName();
                List<String> list = this.multitenantListenerNames;
                Objects.requireNonNull(list);
                return ((Boolean) listenerName.map((v1) -> {
                    return r1.contains(v1);
                }).orElse(false)).booleanValue() ? runAsync : CompletableFuture.completedFuture(null);
            }));
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
    }

    public Map<String, Long> getLastSequenceId() {
        return Collections.unmodifiableMap(this.lastSequenceId);
    }

    public void close() {
        LOG.info("Closing Multi-tenant Sasl Secrets Store from topic: {}", this.topicName);
        if (this.sessionUuid != null) {
            close(this.sessionUuid);
        }
        this.metrics.removeMetric(this.apiKeyCountMetricName);
        this.metrics.removeSensor(this.apiKeyCreationSensor.name());
        this.metrics.removeSensor(this.apiKeyDeletionSensor.name());
        this.metrics.removeSensor(this.topicLoadTimeSensor.name());
    }

    private void close(String str) {
        boolean z = false;
        synchronized (INSTANCES) {
            BaseMultiTenantSaslSecretsStore baseMultiTenantSaslSecretsStore = INSTANCES.get(str);
            if (baseMultiTenantSaslSecretsStore == this) {
                INSTANCES.remove(str);
                z = true;
                LOG.info("Removed {} instance for broker session {}", getClass().getName(), str);
            } else if (baseMultiTenantSaslSecretsStore != null) {
                LOG.info("Closing instance that doesn't match the instance in the static map with the same broker session {} will not remove that instance from the map.", str);
            }
        }
        if (z && this.state.compareAndSet(State.RUNNING, State.SHUTTING_DOWN)) {
            try {
                this.secretsLog.stop();
                this.state.set(State.SHUTDOWN_COMPLETE);
            } catch (Throwable th) {
                this.state.set(State.SHUTDOWN_COMPLETE);
                throw th;
            }
        }
    }

    public MultiTenantSaslSecrets load() {
        if (this.state.get() == State.RUNNING) {
            return this.secrets;
        }
        return null;
    }

    protected abstract void createSensors(Map<String, MultiTenantSaslConfigEntry> map);

    private void startLog() {
        try {
            long nanoTime = System.nanoTime();
            this.secretsLog.start();
            this.state.set(State.RUNNING);
            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            this.topicLoadTimeSensor.record(millis);
            LOG.info("Started MultiTenantSaslSecretsStore from topic: {} in {} ms", this.topicName, Long.valueOf(millis));
            CompletableFuture.runAsync(() -> {
                createSensors(this.secrets.entries());
            });
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new SecretsLogFailedException("Unable to start the consumer for MultiTenantSaslSecretsStore", e);
        }
    }

    protected void read(ConsumerRecord<String, String> consumerRecord) {
        String str = (String) consumerRecord.key();
        if (str == null) {
            LOG.error("Record is missing a key (which is a must!). Ignoring this record.");
            return;
        }
        Long tryParseEventsSequenceId = AuthUtils.tryParseEventsSequenceId(consumerRecord);
        if (tryParseEventsSequenceId == null) {
            LOG.error("Sequence ID is missing from the headers of record with key {}. Ignoring this record.", str);
        } else {
            updateSecrets(consumerRecord, tryParseEventsSequenceId.longValue());
            LOG.trace("Finished reading record with sequence id: {}", tryParseEventsSequenceId);
        }
    }

    boolean validateEntries(String str, Map<String, MultiTenantSaslConfigEntry> map) {
        return map.size() == 1 && map.keySet().stream().allMatch(str2 -> {
            return str2.equals(str);
        });
    }

    public synchronized void updateSecrets(ConsumerRecord<String, String> consumerRecord, long j) {
        String str = (String) consumerRecord.key();
        String str2 = (String) consumerRecord.value();
        Long l = this.lastSequenceId.get(str);
        if (l != null && l.longValue() >= j) {
            LOG.warn("Ignoring older message for key {} with sequence id: {} (last seen id is {})", new Object[]{str, Long.valueOf(j), l});
            return;
        }
        try {
            try {
                if (str2 != null) {
                    maybeLogApiKeyUpdate(str, j, true);
                    MultiTenantSaslSecrets multiTenantSaslSecrets = (MultiTenantSaslSecrets) this.objectMapper.readValue(str2, MultiTenantSaslSecrets.class);
                    if (!validateEntries(str, multiTenantSaslSecrets.entries())) {
                        throw new IllegalStateException("Invalid secrets message for " + str);
                    }
                    this.secretsMap.putAll(multiTenantSaslSecrets.entries());
                    this.apiKeyCreationSensor.record();
                    if (this.state.get() == State.RUNNING) {
                        createSensors(multiTenantSaslSecrets.entries());
                    }
                } else {
                    maybeLogApiKeyUpdate(str, j, false);
                    this.secretsMap.remove(str);
                    deleteCredential(str);
                    this.apiKeyDeletionSensor.record();
                }
                this.lastSequenceId.put(str, Long.valueOf(j));
            } catch (Exception e) {
                LOG.error("Error handling message for api key: {}, sequence id: {}, partition: {}, timestamp: {}", new Object[]{str, Long.valueOf(j), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.timestamp()), e});
                this.lastSequenceId.put(str, Long.valueOf(j));
            }
        } catch (Throwable th) {
            this.lastSequenceId.put(str, Long.valueOf(j));
            throw th;
        }
    }

    private void maybeLogApiKeyUpdate(String str, long j, boolean z) {
        String str2 = z ? "Updating api keys for {} from topic (sequence id: {})" : "Read null value for key {}, deleting from key store (sequence id: {})";
        if (State.RUNNING.equals(this.state.get())) {
            LOG.info(str2, str, Long.valueOf(j));
        } else {
            LOG.debug(str2, str, Long.valueOf(j));
        }
    }

    protected abstract void deleteCredential(String str);

    public static BaseMultiTenantSaslSecretsStore getInstance(String str) {
        BaseMultiTenantSaslSecretsStore baseMultiTenantSaslSecretsStore;
        synchronized (INSTANCES) {
            baseMultiTenantSaslSecretsStore = INSTANCES.get(str);
        }
        return baseMultiTenantSaslSecretsStore;
    }

    protected abstract Map<String, Object> getConsumerConfig(Map<String, ?> map);

    private String getTopicName(Map<String, ?> map) {
        String str = (String) map.get("confluent.cdc.api.keys.topic");
        if (str == null || str.isEmpty()) {
            throw new ConfigException("Missing value of config: confluent.cdc.api.keys.topic");
        }
        return str;
    }

    private Long getTopicLoadTimeout(Map<String, ?> map) {
        Long l = (Long) map.get("confluent.cdc.api.keys.topic.load.timeout.ms");
        if (l == null || l.longValue() <= 0) {
            throw new ConfigException("Missing value of config: confluent.cdc.api.keys.topic.load.timeout.ms");
        }
        return l;
    }
}
