/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.server.plugins.auth;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import io.confluent.kafka.server.plugins.auth.UserMetaDataKey;
import io.confluent.kafka.server.plugins.auth.UserMetaDataValue;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.multitenant.UserMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultUserMetaDataStore
implements UserMetadataStore {
    private static final String METRICS_GROUP = "user-metadata-metrics";
    public static final String USER_METADATA_EVENT_RATE = "user-metadata-event-rate";
    public static final String USER_METADATA_EVENT_FAILURE_RATE = "user-metadata-event-failure-rate";
    public static final String USER_METADATA_COUNT = "user-metadata-count";
    public static final String USER_METADATA_TOPIC_LOAD_TIME = "user-metadata-topic-load-time";
    private static final String USER_METADATA_EVENT_SENSOR = "user-metadata-event";
    private static final String USER_METADATA_EVENT_FAILURE_SENSOR = "user-metadata-event-failure";
    private static final String USER_METADATA_COUNT_SENSOR = "user-metadata-count";
    private static final String USER_METADATA_TOPIC_LOAD_TIME_SENSOR = "user-metadata-topic-load-time";
    protected static final Map<String, DefaultUserMetaDataStore> INSTANCES = new HashMap<String, DefaultUserMetaDataStore>();
    private static final Logger LOG = LoggerFactory.getLogger(DefaultUserMetaDataStore.class);
    private final Time time;
    protected String sessionUuid;
    protected String topicName;
    protected List<String> multitenantListenerNames = Collections.emptyList();
    private KafkaBasedLog<String, String> userMetadataLog;
    private final Map<String, ?> interBrokerClientConfigs;
    private final ObjectMapper objectMapper;
    private final Metrics metrics;
    private final Sensor userMetaDataEventSensor;
    private final Sensor userMetaDataEventFailureSensor;
    private final Sensor userMetaDataCountSensor;
    private final Sensor topicLoadTimeSensor;
    private final Map<String, Long> lastSequenceId;
    private final Map<String, UserOrgData> userResourceIdToUserId;
    private final Map<String, String> userIdToUserResourceId;
    final AtomicReference<State> state = new AtomicReference<State>(State.NOT_STARTED);

    public DefaultUserMetaDataStore(Map<String, ?> interBrokerClientConfig, Metrics metrics) {
        this(interBrokerClientConfig, metrics, Time.SYSTEM);
    }

    public DefaultUserMetaDataStore(Map<String, ?> interBrokerClientConfigs, Metrics metrics, Time time) {
        this.time = time;
        this.interBrokerClientConfigs = interBrokerClientConfigs;
        this.metrics = metrics;
        this.objectMapper = new ObjectMapper();
        this.lastSequenceId = new HashMap<String, Long>();
        this.userIdToUserResourceId = new ConcurrentHashMap<String, String>();
        this.userResourceIdToUserId = new ConcurrentHashMap<String, UserOrgData>();
        this.userMetaDataEventSensor = metrics.sensor(USER_METADATA_EVENT_SENSOR);
        this.userMetaDataEventSensor.add(metrics.metricName(USER_METADATA_EVENT_RATE, METRICS_GROUP, "The event rate for user metadata topic"), (MeasurableStat)new Rate());
        this.userMetaDataEventFailureSensor = metrics.sensor(USER_METADATA_EVENT_FAILURE_SENSOR);
        this.userMetaDataEventFailureSensor.add(metrics.metricName(USER_METADATA_EVENT_FAILURE_RATE, METRICS_GROUP, "The failure event rate for user metadata topic"), (MeasurableStat)new Rate());
        this.userMetaDataCountSensor = metrics.sensor("user-metadata-count");
        this.userMetaDataCountSensor.add(metrics.metricName("user-metadata-count", METRICS_GROUP, "The number unique keys in user metadata topic."), (MeasurableStat)new CumulativeSum());
        this.topicLoadTimeSensor = metrics.sensor("user-metadata-topic-load-time");
        this.topicLoadTimeSensor.add(metrics.metricName("user-metadata-topic-load-time", METRICS_GROUP, "The loading time for the user metadata topic."), (MeasurableStat)new Max());
    }

    public Map<Endpoint, CompletableFuture<Void>> start(AuthorizerServerInfo serverInfo) {
        CompletableFuture<Void> logStartedFuture;
        if (!this.state.compareAndSet(State.NOT_STARTED, State.STARTING)) {
            throw new IllegalStateException("Trying to start a log from a state it can't be started in");
        }
        try {
            LOG.info("Starting " + this.getClass().getSimpleName());
            logStartedFuture = CompletableFuture.runAsync(() -> this.startLog());
        }
        catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
        HashMap<Endpoint, CompletableFuture<Void>> futures = new HashMap<Endpoint, CompletableFuture<Void>>(serverInfo.endpoints().size());
        Collection earlyStartListenerNames = serverInfo.earlyStartListeners().stream().map(listenerName -> Optional.of(listenerName).map(ListenerName::normalised).map(ListenerName::value).orElse("")).collect(Collectors.toList());
        serverInfo.endpoints().forEach(endpoint -> {
            Optional<String> normalisedListenerName = endpoint.listenerName().map(ListenerName::normalised).map(ListenerName::value);
            if (!normalisedListenerName.map(this.multitenantListenerNames::contains).orElse(false).booleanValue()) {
                if (!normalisedListenerName.map(earlyStartListenerNames::contains).orElse(false).booleanValue()) {
                    futures.put((Endpoint)endpoint, (CompletableFuture<Void>)CompletableFuture.completedFuture(null));
                    LOG.info("Immediately resolve {} future for endpoint: {}", (Object)DefaultUserMetaDataStore.class.getSimpleName(), endpoint);
                    return;
                }
            }
            futures.put((Endpoint)endpoint, logStartedFuture);
            LOG.info("Wait for {} future to complete before enabling endpoint: {}", (Object)DefaultUserMetaDataStore.class.getSimpleName(), endpoint);
        });
        return futures;
    }

    private void startLog() {
        if (!this.state.get().equals((Object)State.STARTING)) {
            throw new IllegalStateException("Trying to start log from a non starting state");
        }
        try {
            long startNs = this.time.nanoseconds();
            this.userMetadataLog.start();
            this.state.set(State.RUNNING);
            long loadTimeNs = this.time.nanoseconds() - startNs;
            this.topicLoadTimeSensor.record((double)loadTimeNs);
            LOG.info("Consumed initial set of user metadata from topic took {} nanoseconds", (Object)loadTimeNs);
        }
        catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to start consuming user metadata from topic", e);
        }
    }

    public void close() {
        if (this.sessionUuid == null) {
            LOG.warn("close() called without configure() being called first");
            return;
        }
        this.metrics.removeSensor(this.userMetaDataEventSensor.name());
        this.metrics.removeSensor(this.userMetaDataEventFailureSensor.name());
        this.metrics.removeSensor(this.userMetaDataCountSensor.name());
        this.metrics.removeSensor(this.topicLoadTimeSensor.name());
        LOG.info("Closing consumer for session {}", (Object)this.sessionUuid);
        this.close(this.sessionUuid);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(String sessionUuid) {
        Map<String, DefaultUserMetaDataStore> map = INSTANCES;
        synchronized (map) {
            DefaultUserMetaDataStore instance = INSTANCES.get(sessionUuid);
            if (instance != this) {
                LOG.error(DefaultUserMetaDataStore.class.getSimpleName() + " closing instance that doesn't match the instance in the static map with the same broker session {}. Will not close this instance or remove it from the map", (Object)sessionUuid);
                return;
            }
            INSTANCES.remove(sessionUuid);
            LOG.info("Removed instance for broker session {}", (Object)sessionUuid);
        }
        if (this.userMetadataLog != null) {
            try {
                this.userMetadataLog.stop();
                LOG.info("Successfully closed the consumer");
            }
            catch (Exception e) {
                LOG.error("Error when shutting down the consumer", (Throwable)e);
            }
        }
        this.state.set(State.CLOSED);
    }

    public void configure(Map<String, ?> configs) {
        LOG.info("Configuring " + this.getClass().getSimpleName());
        this.sessionUuid = AuthUtils.getBrokerSessionUuid(configs);
        this.multitenantListenerNames = ConfluentConfigs.multitenantListenerNames(configs, null);
        if (!this.addInstance(this.sessionUuid)) {
            LOG.info("Skipping configuring {} instance is already configured for broker session {}", (Object)this.getClass().getSimpleName(), (Object)this.sessionUuid);
            return;
        }
        this.userMetadataLog = this.configureConsumer(configs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean addInstance(String sessionUuid) {
        Map<String, DefaultUserMetaDataStore> map = INSTANCES;
        synchronized (map) {
            DefaultUserMetaDataStore instance = INSTANCES.get(sessionUuid);
            if (instance == null) {
                INSTANCES.put(sessionUuid, this);
                return true;
            }
            if (this != instance) {
                throw new IllegalStateException(this.getClass().getSimpleName() + " instance already exists for broker session " + sessionUuid);
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static DefaultUserMetaDataStore getInstance(String brokerSessionUuid) {
        Map<String, DefaultUserMetaDataStore> map = INSTANCES;
        synchronized (map) {
            return INSTANCES.get(brokerSessionUuid);
        }
    }

    private KafkaBasedLog<String, String> configureConsumer(Map<String, ?> configs) {
        State s = this.state.get();
        if (!s.equals((Object)State.NOT_STARTED)) {
            throw new IllegalStateException(this.getClass().getSimpleName() + " configureConsumer called in a state it can't start in: " + String.valueOf((Object)s));
        }
        String topic = (String)configs.get("confluent.cdc.user.metadata.topic");
        Long timeoutMs = (Long)configs.get("confluent.cdc.api.keys.topic.load.timeout.ms");
        if (timeoutMs == null || timeoutMs <= 0L) {
            throw new ConfigException("Value for config confluent.cdc.api.keys.topic.load.timeout.ms must be positive integer when using user metadata topic");
        }
        String clientId = String.format("%s-%s-%s", topic, ConfluentConfigs.ClientType.CONSUMER, configs.get("broker.id"));
        HashSet consumerConfigNames = new HashSet(ConsumerConfig.configNames());
        consumerConfigNames.remove("metric.reporters");
        HashMap consumerProps = new HashMap(this.interBrokerClientConfigs);
        consumerProps.keySet().retainAll(consumerConfigNames);
        consumerProps.put("client.id", clientId);
        consumerProps.put("allow.auto.create.topics", false);
        consumerProps.put("bootstrap.servers", this.interBrokerClientConfigs.get("bootstrap.servers"));
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());
        consumerProps.put("default.api.timeout.ms", (int)Math.min(timeoutMs, Integer.MAX_VALUE));
        consumerProps.put("enable.metrics.push", false);
        return new KafkaBasedLog(topic, null, consumerProps, () -> null, (Callback)new ConsumeCallback(), this.time, null, timeoutMs.longValue());
    }

    protected void read(ConsumerRecord<String, String> record) {
        this.userMetaDataEventSensor.record();
        String key = (String)record.key();
        if (key == null) {
            this.userMetaDataEventFailureSensor.record();
            LOG.error("Missing key in user metadata message! (partition = {}, offset = {}, timestamp = {})", new Object[]{record.partition(), record.offset(), record.timestamp()});
            return;
        }
        Long sequenceId = AuthUtils.tryParseEventsSequenceId(record);
        if (sequenceId == null) {
            this.userMetaDataEventFailureSensor.record();
            LOG.error("Missing sequence ID in userId metadata message! (partition = {}, offset = {}, timestamp = {})", new Object[]{record.partition(), record.offset(), record.timestamp()});
            return;
        }
        this.updateUserMetadata(record, sequenceId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void updateUserMetadata(ConsumerRecord<String, String> record, Long currSeqId) {
        String jsonKey = (String)record.key();
        String jsonValue = (String)record.value();
        Long prevSeqId = this.lastSequenceId.get(jsonKey);
        if (prevSeqId != null && prevSeqId >= currSeqId) {
            LOG.warn("Ignoring older message for key {} with sequence id: {} (last seen id is {})", new Object[]{jsonKey, currSeqId, prevSeqId});
            return;
        }
        try {
            UserMetaDataKey userMetaDataKey = (UserMetaDataKey)this.objectMapper.readValue(jsonKey, UserMetaDataKey.class);
            if (jsonValue != null) {
                UserMetaDataValue userMetaDataValue = (UserMetaDataValue)this.objectMapper.readValue(jsonValue, UserMetaDataValue.class);
                this.updateUserResourceIdMap(userMetaDataKey, userMetaDataValue);
                this.maybeLogUserMetadataUpdate(jsonKey, currSeqId, true);
            } else {
                String userResourceId = userMetaDataKey.userResourceId();
                String orgResourceId = userMetaDataKey.orgResourceId();
                Optional<String> userId = this.userResourceIdToUserId(userResourceId);
                if (userId.isPresent()) {
                    this.removeFromUserResourceIdMap(userResourceId, orgResourceId, userId.get());
                }
                this.maybeLogUserMetadataUpdate(jsonKey, currSeqId, true);
            }
        }
        catch (Exception e) {
            this.userMetaDataEventFailureSensor.record();
            LOG.error("Error handling message for user metadata key: {}, value: {}, sequence id: {}, partition: {}, timestamp: {}", new Object[]{jsonKey, jsonValue, currSeqId, record.partition(), record.timestamp(), e});
        }
        finally {
            this.lastSequenceId.put(jsonKey, currSeqId);
        }
    }

    private void updateUserResourceIdMap(UserMetaDataKey userMetaDataKey, UserMetaDataValue userMetaDataValue) {
        if (!this.userResourceIdToUserId.containsKey(userMetaDataKey.userResourceId())) {
            this.userResourceIdToUserId.put(userMetaDataKey.userResourceId(), new UserOrgData(userMetaDataValue.userId()));
        }
        if (this.userResourceIdToUserId.get(userMetaDataKey.userResourceId()).addOrg(userMetaDataKey.orgResourceId())) {
            this.userMetaDataCountSensor.record(1.0);
        }
        this.userIdToUserResourceId.put(userMetaDataValue.userId(), userMetaDataKey.userResourceId());
    }

    private void removeFromUserResourceIdMap(String userResourceId, String orgResourceId, String userId) {
        if (this.userResourceIdToUserId.get(userResourceId).removeOrg(orgResourceId)) {
            this.userMetaDataCountSensor.record(-1.0);
        }
        if (this.userResourceIdToUserId.get(userResourceId).orgsIsEmpty()) {
            this.userIdToUserResourceId.remove(userId);
            this.userResourceIdToUserId.remove(userResourceId);
        }
    }

    public Optional<String> userIdToUserResourceId(String userId) {
        return Optional.ofNullable(this.userIdToUserResourceId.get(userId));
    }

    public Optional<String> userResourceIdToUserId(String userResourceId) {
        if (this.userResourceIdToUserId.containsKey(userResourceId)) {
            return Optional.of(this.userResourceIdToUserId.get(userResourceId).userId());
        }
        return Optional.empty();
    }

    private void maybeLogUserMetadataUpdate(String key, Long sequenceId, boolean isUpdate) {
        String userMetadataUpdateLogMessage;
        String string = userMetadataUpdateLogMessage = isUpdate ? "Updating userId metadata for {} from topic (sequence id: {})" : "Read null value for key {}, deleting from user metadata store (sequence id: {})";
        if (State.RUNNING.equals((Object)this.state.get())) {
            LOG.info(userMetadataUpdateLogMessage, (Object)key, (Object)sequenceId);
        } else {
            LOG.debug(userMetadataUpdateLogMessage, (Object)key, (Object)sequenceId);
        }
    }

    Map<String, Long> lastSequenceId() {
        return this.lastSequenceId;
    }

    void configure(KafkaBasedLog<String, String> userMetadataLog, String sessionUuid, List<String> multitenantListenerNames) {
        this.multitenantListenerNames = multitenantListenerNames;
        this.userMetadataLog = userMetadataLog;
        this.sessionUuid = sessionUuid;
        this.addInstance(sessionUuid);
    }

    public int numOfMapping() {
        return this.userIdToUserResourceId.size();
    }

    public Metrics metrics() {
        return this.metrics;
    }

    public static enum State {
        NOT_STARTED(0),
        STARTING(1),
        RUNNING(2),
        SHUTTING_DOWN(3),
        FAILED_TO_START(4),
        CLOSED(5);

        private final byte value;

        private State(byte value) {
            this.value = value;
        }
    }

    private class ConsumeCallback
    implements Callback<ConsumerRecord<String, String>> {
        private ConsumeCallback() {
        }

        public void onCompletion(Throwable error, ConsumerRecord<String, String> record) {
            if (error != null) {
                DefaultUserMetaDataStore.this.userMetaDataEventFailureSensor.record();
                LOG.error("Unexpected error in consumer callback for UserMetadataStore: ", error);
                return;
            }
            DefaultUserMetaDataStore.this.read(record);
        }
    }

    private static class UserOrgData {
        private String userId;
        private Set<String> orgResourceIds;

        UserOrgData(String userId) {
            this.userId = userId;
            this.orgResourceIds = new HashSet<String>();
        }

        private boolean addOrg(String orgResourceId) {
            return this.orgResourceIds.add(orgResourceId);
        }

        private boolean removeOrg(String orgResourceId) {
            return this.orgResourceIds.remove(orgResourceId);
        }

        private boolean orgsIsEmpty() {
            return this.orgResourceIds.isEmpty();
        }

        private String userId() {
            return this.userId;
        }
    }
}

