/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.server.plugins.auth;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import io.confluent.kafka.server.plugins.auth.UserMetaDataKey;
import io.confluent.kafka.server.plugins.auth.UserMetaDataValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import kafka.common.DeletionEventHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.acl.AclState;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UserDeletionEventHandler
implements DeletionEventHandler {
    private static final String METRICS_GROUP = "user-deletion-handler-metrics";
    public static final String USER_DELETION_HANDLER_EVENT_FAILURE_RATE = "user-deletion-event-failure-rate";
    private static final String USER_DELETION_HANDLER_EVENT_FAILURE_SENSOR = "user-deletion-event-failure";
    public static final String USER_DELETION_HANDLER_EVENT_COUNT = "user-deletion-event-count";
    private static final String USER_DELETION_HANDLER_COUNT_SENSOR = "user-deletion-event-count";
    private static final Logger LOG = LoggerFactory.getLogger(UserDeletionEventHandler.class);
    private final Time time;
    protected String sessionUuid;
    private KafkaBasedLog<String, String> userMetadataLog;
    private final Map<String, ?> interBrokerClientConfigs;
    private final Map<String, Long> lastSequenceId;
    public final Optional<Authorizer> authorizer;
    private final ObjectMapper objectMapper;
    protected static final Map<String, UserDeletionEventHandler> INSTANCES = new HashMap<String, UserDeletionEventHandler>();
    private final int controllerNodeID;
    public final Map<UserMetaDataKey, UserMetaDataValue> userMetaDataKeyUserMetaDataValueMap;
    private final Metrics metrics;
    private Sensor userDeletionEventFailureSensor;
    private Sensor userDeletionEventCountSensor;
    private volatile boolean afterStartup = false;
    final AtomicReference<State> state = new AtomicReference<State>(State.NOT_STARTED);

    public UserDeletionEventHandler(Map<String, ?> interBrokerClientConfigs, Optional<Authorizer> authorizer, Metrics metrics, int controllerNodeID) {
        this(interBrokerClientConfigs, authorizer, metrics, controllerNodeID, Time.SYSTEM);
    }

    public UserDeletionEventHandler(Map<String, ?> interBrokerClientConfigs, Optional<Authorizer> authorizer, Metrics metrics, int controllerNodeID, Time time) {
        this.time = time;
        this.interBrokerClientConfigs = interBrokerClientConfigs;
        this.controllerNodeID = controllerNodeID;
        this.lastSequenceId = new HashMap<String, Long>();
        this.authorizer = authorizer;
        this.objectMapper = new ObjectMapper();
        this.userMetaDataKeyUserMetaDataValueMap = new HashMap<UserMetaDataKey, UserMetaDataValue>();
        this.metrics = metrics;
        this.userDeletionEventFailureSensor = metrics.sensor(USER_DELETION_HANDLER_EVENT_FAILURE_SENSOR);
        this.userDeletionEventFailureSensor.add(metrics.metricName(USER_DELETION_HANDLER_EVENT_FAILURE_RATE, METRICS_GROUP, "The failure event rate for user deletion event handler."), (MeasurableStat)new Rate());
        this.userDeletionEventCountSensor = metrics.sensor("user-deletion-event-count");
        this.userDeletionEventCountSensor.add(metrics.metricName("user-deletion-event-count", METRICS_GROUP, "The count for user deletion events"), (MeasurableStat)new CumulativeSum());
    }

    public String name() {
        return this.getClass().getSimpleName();
    }

    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
    }

    public synchronized void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) {
        if (!this.afterStartup) {
            return;
        }
        try {
            LOG.info("Controller changing to : " + String.valueOf(newLeaderAndEpoch));
            if (newLeaderAndEpoch.leaderId().equals(OptionalInt.of(this.controllerNodeID))) {
                if (this.state.get() == State.NOT_STARTED) {
                    this.start(true);
                }
            } else {
                this.close();
            }
        }
        catch (Exception e) {
            LOG.error("Error during onControllerChange in " + this.getClass().getSimpleName());
            this.userDeletionEventFailureSensor.record();
        }
    }

    public synchronized CompletableFuture<Void> start(Boolean isActiveController) {
        CompletableFuture<Void> logStartedFuture;
        this.afterStartup = true;
        if (!isActiveController.booleanValue()) {
            LOG.info("Not starting the handler as it is not the active controller");
            return CompletableFuture.completedFuture(null);
        }
        if (!this.authorizer.isPresent()) {
            LOG.error("Authorizer is null, cannot start the handler");
            return CompletableFuture.completedFuture(null);
        }
        if (!this.state.compareAndSet(State.NOT_STARTED, State.STARTING)) {
            throw new IllegalStateException("Trying to start a log from a state it can't be started in");
        }
        try {
            LOG.info("Starting " + this.getClass().getSimpleName());
            this.initialize();
            logStartedFuture = CompletableFuture.runAsync(this::startLog);
        }
        catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
        return logStartedFuture;
    }

    private void initialize() {
        this.addInstance(this.sessionUuid);
        this.userDeletionEventFailureSensor = this.metrics.sensor(USER_DELETION_HANDLER_EVENT_FAILURE_SENSOR);
        this.userDeletionEventFailureSensor.add(this.metrics.metricName(USER_DELETION_HANDLER_EVENT_FAILURE_RATE, METRICS_GROUP, "The failure event rate for user deletion event handler."), (MeasurableStat)new Rate());
        this.userDeletionEventCountSensor = this.metrics.sensor("user-deletion-event-count");
        this.userDeletionEventCountSensor.add(this.metrics.metricName("user-deletion-event-count", METRICS_GROUP, "The count for user deletion events"), (MeasurableStat)new CumulativeSum());
    }

    private void startLog() {
        if (!this.state.get().equals((Object)State.STARTING)) {
            LOG.info("Trying to start consumer from a non-starting state");
            return;
        }
        try {
            long startMs = this.time.milliseconds();
            LOG.info("Starting consumer from " + this.getClass().getSimpleName());
            this.userMetadataLog.start();
            this.state.set(State.RUNNING);
            long loadTimeMs = this.time.milliseconds() - startMs;
            LOG.info("Consumed initial set of user metadata from topic took {} milliseconds", (Object)loadTimeMs);
        }
        catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            this.userDeletionEventFailureSensor.record();
            throw new IllegalStateException("Unable to start consuming user metadata from topic", e);
        }
    }

    public void configure(Map<String, ?> configs) {
        LOG.info("Configuring " + this.getClass().getSimpleName());
        this.sessionUuid = AuthUtils.getBrokerSessionUuid(configs);
        if (!this.addInstance(this.sessionUuid)) {
            LOG.info("Skipping configuring {} instance is already configured for broker session {}", (Object)this.getClass().getSimpleName(), (Object)this.sessionUuid);
            return;
        }
        this.userMetadataLog = this.configureConsumer(configs);
    }

    public void close() {
        if (this.sessionUuid == null || this.state.get() == State.NOT_STARTED) {
            return;
        }
        this.metrics.removeSensor(this.userDeletionEventFailureSensor.name());
        this.metrics.removeSensor(this.userDeletionEventCountSensor.name());
        LOG.info("Closing consumer for session {}", (Object)this.sessionUuid);
        this.close(this.sessionUuid);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(String sessionUuid) {
        Map<String, UserDeletionEventHandler> map = INSTANCES;
        synchronized (map) {
            UserDeletionEventHandler instance = INSTANCES.get(sessionUuid);
            if (instance != this) {
                LOG.error(UserDeletionEventHandler.class.getSimpleName() + " closing instance that doesn't match the instance in the static map with the same broker session {}. Will not close this instance or remove it from the map", (Object)sessionUuid);
                return;
            }
            INSTANCES.remove(sessionUuid);
            LOG.info("Removed instance for broker session {}", (Object)sessionUuid);
        }
        if (this.userMetadataLog != null) {
            try {
                this.userMetadataLog.stop();
                LOG.info("Successfully closed the consumer");
            }
            catch (Exception e) {
                LOG.error("Error when shutting down the consumer", (Throwable)e);
            }
        }
        this.state.set(State.NOT_STARTED);
    }

    private KafkaBasedLog<String, String> configureConsumer(Map<String, ?> configs) {
        State s = this.state.get();
        if (!s.equals((Object)State.NOT_STARTED)) {
            throw new IllegalStateException(this.getClass().getSimpleName() + " configureConsumer called in a state it can't start in: " + String.valueOf((Object)s));
        }
        String topic = (String)configs.get("confluent.cdc.user.metadata.topic");
        Long timeoutMs = (Long)configs.get("confluent.cdc.api.keys.topic.load.timeout.ms");
        if (timeoutMs == null || timeoutMs <= 0L) {
            throw new ConfigException("Value for config confluent.cdc.api.keys.topic.load.timeout.ms must be positive integer when using user metadata topic");
        }
        String clientId = String.format("%s-%s-%s-deletion-handler", topic, ConfluentConfigs.ClientType.CONSUMER, configs.get("broker.id"));
        HashSet consumerConfigNames = new HashSet(ConsumerConfig.configNames());
        consumerConfigNames.remove("metric.reporters");
        HashMap consumerProps = new HashMap(this.interBrokerClientConfigs);
        consumerProps.keySet().retainAll(consumerConfigNames);
        consumerProps.put("client.id", clientId);
        consumerProps.put("allow.auto.create.topics", false);
        consumerProps.put("bootstrap.servers", this.interBrokerClientConfigs.get("bootstrap.servers"));
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());
        consumerProps.put("default.api.timeout.ms", (int)Math.min(timeoutMs, Integer.MAX_VALUE));
        consumerProps.put("enable.metrics.push", false);
        return new KafkaBasedLog(topic, null, consumerProps, () -> null, (Callback)new ConsumeCallback(), this.time, null, timeoutMs.longValue());
    }

    protected void read(ConsumerRecord<String, String> record) {
        String key = (String)record.key();
        if (key == null) {
            this.userDeletionEventFailureSensor.record();
            LOG.error("Missing key in user metadata message! (partition = {}, offset = {}, timestamp = {})", new Object[]{record.partition(), record.offset(), record.timestamp()});
            return;
        }
        Long sequenceId = AuthUtils.tryParseEventsSequenceId(record);
        if (sequenceId == null) {
            this.userDeletionEventFailureSensor.record();
            LOG.error("Missing sequence ID in userId metadata message! (partition = {}, offset = {}, timestamp = {})", new Object[]{record.partition(), record.offset(), record.timestamp()});
            return;
        }
        this.maybeSoftDelete(record, sequenceId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void maybeSoftDelete(ConsumerRecord<String, String> record, Long currSeqId) {
        String jsonKey = (String)record.key();
        String jsonValue = (String)record.value();
        Long prevSeqId = this.lastSequenceId.get(jsonKey);
        if (prevSeqId != null && prevSeqId >= currSeqId) {
            return;
        }
        try {
            UserMetaDataKey userMetaDataKey = (UserMetaDataKey)this.objectMapper.readValue(jsonKey, UserMetaDataKey.class);
            if (jsonValue == null && this.userMetaDataKeyUserMetaDataValueMap.containsKey(userMetaDataKey)) {
                UserMetaDataValue userMetaDataValue = this.userMetaDataKeyUserMetaDataValueMap.get(userMetaDataKey);
                List<AclBindingFilter> aclBindingFilters = this.filtersForUser(userMetaDataKey, userMetaDataValue);
                boolean wasProcessed = true;
                for (AclBindingFilter aclBindingFilter : aclBindingFilters) {
                    ArrayList existingAcls = Lists.newArrayList((Iterable)this.authorizer.get().acls(aclBindingFilter));
                    if (existingAcls.isEmpty()) continue;
                    wasProcessed = false;
                    AuthorizableRequestContext context = this.getContext();
                    try {
                        LOG.info("Processing soft deletion of ACLs for user : " + userMetaDataKey.userResourceId + " and aclBindingFilter : " + String.valueOf(aclBindingFilter));
                        this.authorizer.get().deleteAcls(context, Collections.singletonList(aclBindingFilter), Optional.empty(), AclState.ACTIVE).get(0);
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Handler: Unable to delete the ACLs associated with user ", e);
                    }
                }
                if (!wasProcessed) {
                    this.userDeletionEventCountSensor.record(1.0);
                }
                this.userMetaDataKeyUserMetaDataValueMap.remove(userMetaDataKey);
            } else if (jsonValue != null) {
                UserMetaDataValue userMetaDataValue = (UserMetaDataValue)this.objectMapper.readValue(jsonValue, UserMetaDataValue.class);
                this.userMetaDataKeyUserMetaDataValueMap.put(userMetaDataKey, userMetaDataValue);
            }
        }
        catch (Exception e) {
            this.userDeletionEventFailureSensor.record();
            LOG.error("Error handling message for user metadata key in UserDeletionEventHandler: {}, value: {}, sequence id: {}, partition: {}, timestamp: {}", new Object[]{jsonKey, jsonValue, currSeqId, record.partition(), record.timestamp(), e});
        }
        finally {
            this.lastSequenceId.put(jsonKey, currSeqId);
        }
    }

    public AuthorizableRequestContext getContext() {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.DELETE_ACLS, 3, "", 2828), "", null, null, null, null, ClientInformation.EMPTY, null, false);
        return context;
    }

    private List<AclBindingFilter> filtersForUser(UserMetaDataKey userMetaDataKey, UserMetaDataValue userMetaDataValue) {
        String resourceId = userMetaDataKey.userResourceId();
        String userId = userMetaDataValue.userId();
        List<String> lkcIds = userMetaDataValue.lkcIds();
        ArrayList<AclBindingFilter> filters = new ArrayList<AclBindingFilter>();
        for (String lkcId : lkcIds) {
            filters.add(this.filterForId(userId, lkcId));
            filters.add(this.filterForId(resourceId, lkcId));
        }
        return filters;
    }

    private AclBindingFilter filterForId(String id, String lkcId) {
        KafkaPrincipal principal = new KafkaPrincipal("TenantUser", lkcId + "_" + id);
        return new AclBindingFilter(ResourcePatternFilter.ANY, new AccessControlEntryFilter(principal.toString(), "*", AclOperation.ANY, AclPermissionType.ANY, Collections.singletonList(Uuid.ZERO_UUID)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean addInstance(String sessionUuid) {
        Map<String, UserDeletionEventHandler> map = INSTANCES;
        synchronized (map) {
            UserDeletionEventHandler instance = INSTANCES.get(sessionUuid);
            if (instance == null) {
                INSTANCES.put(sessionUuid, this);
                return true;
            }
            if (this != instance) {
                throw new IllegalStateException(this.getClass().getSimpleName() + " instance already exists for broker session " + sessionUuid);
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static UserDeletionEventHandler getInstance(String brokerSessionUuid) {
        Map<String, UserDeletionEventHandler> map = INSTANCES;
        synchronized (map) {
            return INSTANCES.get(brokerSessionUuid);
        }
    }

    public Metrics metrics() {
        return this.metrics;
    }

    void configure(KafkaBasedLog<String, String> userMetadataLog, String sessionUuid) {
        this.userMetadataLog = userMetadataLog;
        this.sessionUuid = sessionUuid;
        this.addInstance(sessionUuid);
    }

    public static enum State {
        NOT_STARTED(0),
        STARTING(1),
        RUNNING(2),
        FAILED_TO_START(3);

        private final byte value;

        private State(byte value) {
            this.value = value;
        }
    }

    private class ConsumeCallback
    implements Callback<ConsumerRecord<String, String>> {
        private ConsumeCallback() {
        }

        public void onCompletion(Throwable error, ConsumerRecord<String, String> record) {
            if (error != null) {
                UserDeletionEventHandler.this.userDeletionEventFailureSensor.record();
                LOG.error("Unexpected error in consumer callback for UserDeletionEventHandler: ", error);
                return;
            }
            UserDeletionEventHandler.this.read(record);
        }
    }
}

