/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.requests;

import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestLogFilter;
import org.apache.kafka.common.utils.LogAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DetailedRequestAuditLogFilter
implements RequestLogFilter {
    private static final Logger LOG = LoggerFactory.getLogger(DetailedRequestAuditLogFilter.class);
    public static final String ENABLE_DETAILED_AUDIT_LOGGING = "confluent.security.event.logger.enable.detailed.audit.logs";
    public static final String ENABLE_PRODUCE_CONSUME_AUDIT_LOGGING = "confluent.security.event.logger.enable.produce.consume.audit.logs";
    public static final String ENABLE_PRODUCE_CONSUME_METRIC_LOGGING = "confluent.security.event.logger.enable.produce.consume.metrics";
    public static final String DISABLED_APIS_FOR_AUDIT_LOGS = "confluent.security.event.logger.detailed.audit.logs.disabled.apis";
    private Metrics kafkaMetrics;
    private Sensor produceConsumeAuditEventRateSensor = null;
    private static final String PRODUCE_CONSUME_AUDIT_EVENT_RATE_SENSOR = "produce-consume-audit-event-rate-sensor";
    private static final String PRODUCE_CONSUME_AUDIT_EVENT_RATE = "produce-consume-audit-event-rate";
    private static final String GROUP_NAME = "confluent-audit-metrics";
    public static final EnumSet<ApiKeys> SUPPORTED_APIS_MGMT_OPERATIONS = EnumSet.of(ApiKeys.CREATE_TOPICS, new ApiKeys[]{ApiKeys.DELETE_TOPICS, ApiKeys.CREATE_PARTITIONS, ApiKeys.CREATE_ACLS, ApiKeys.DELETE_ACLS, ApiKeys.CREATE_CLUSTER_LINKS, ApiKeys.DELETE_CLUSTER_LINKS, ApiKeys.DELETE_GROUPS, ApiKeys.ALTER_CONFIGS, ApiKeys.INCREMENTAL_ALTER_CONFIGS, ApiKeys.ALTER_MIRRORS});
    public static final EnumSet<ApiKeys> SUPPORTED_APIS_PRODUCE_CONSUME = EnumSet.of(ApiKeys.PRODUCE, ApiKeys.FETCH);
    private static final Set<String> RECONFIGURABLE_CONFIGS = new HashSet<String>();
    private volatile boolean enableDetailedAuditLogs = true;
    private volatile boolean enableProduceConsumeAuditLogs = true;
    private volatile boolean enableProduceConsumeMetric = true;
    private volatile Set<ApiKeys> disabledAPIs = new HashSet<ApiKeys>();

    @Override
    public LogAction processRequest(RequestContext ctx, AbstractRequest abstractRequest, long requestTimeNanos) {
        try {
            if (this.enableDetailedAuditLogs && SUPPORTED_APIS_MGMT_OPERATIONS.contains((Object)ctx.apiKey()) && !this.disabledAPIs.contains((Object)ctx.apiKey())) {
                return LogAction.LOGGED;
            }
            if ((this.enableProduceConsumeMetric || this.enableProduceConsumeAuditLogs) && SUPPORTED_APIS_PRODUCE_CONSUME.contains((Object)ctx.apiKey()) && !this.disabledAPIs.contains((Object)ctx.apiKey())) {
                long[] topicPartitionsToLog = new long[]{0L};
                switch (ctx.apiKey()) {
                    case PRODUCE: {
                        ProduceRequest request = (ProduceRequest)abstractRequest;
                        request.data().topicData().forEach(topicProduceData -> {
                            String topicName = topicProduceData.name();
                            Uuid topicId = Uuid.ZERO_UUID;
                            topicProduceData.partitionData().forEach(partitionProduceData -> {
                                TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partitionProduceData.index(), topicName);
                                if (!ctx.produceConsumeAuditLogTracker.hasProduceTopicIdPartition(topicIdPartition).booleanValue()) {
                                    ctx.produceConsumeAuditLogTracker.addProduceTopicIdPartition(topicIdPartition);
                                    topicPartitionsToLog[0] = topicPartitionsToLog[0] + 1L;
                                }
                            });
                        });
                        break;
                    }
                    case FETCH: {
                        FetchRequest request = (FetchRequest)abstractRequest;
                        request.data().topics().forEach(fetchTopic -> {
                            String topicName = fetchTopic.topic();
                            Uuid topicId = fetchTopic.topicId();
                            fetchTopic.partitions().forEach(fetchPartition -> {
                                TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, fetchPartition.partition(), topicName);
                                if (!ctx.produceConsumeAuditLogTracker.hasConsumeTopicIdPartition(topicIdPartition).booleanValue()) {
                                    ctx.produceConsumeAuditLogTracker.addConsumeTopicIdPartition(topicIdPartition);
                                    topicPartitionsToLog[0] = topicPartitionsToLog[0] + 1L;
                                }
                            });
                        });
                    }
                }
                if (this.enableProduceConsumeMetric) {
                    this.produceConsumeAuditEventRateSensor.record(topicPartitionsToLog[0]);
                }
                if (this.enableProduceConsumeAuditLogs) {
                    return topicPartitionsToLog[0] > 0L ? LogAction.LOGGED : LogAction.NOT_LOGGED;
                }
            }
        }
        catch (Exception e) {
            LOG.error("Error occurred while deciding log action", (Throwable)e);
        }
        return LogAction.NOT_LOGGED;
    }

    @Override
    public Set<String> reconfigurableConfigs() {
        return RECONFIGURABLE_CONFIGS;
    }

    @Override
    public void reconfigure(Map<String, ?> configs) {
        this.configure(configs);
    }

    @Override
    public void configure(Map<String, ?> configs) {
        Config config = new Config(configs);
        this.enableDetailedAuditLogs = config.enableDetailedAuditLogging();
        this.enableProduceConsumeAuditLogs = config.enableProduceConsumeAuditLogging();
        this.enableProduceConsumeMetric = config.enableProduceConsumeMetrics();
        this.disabledAPIs = config.disabledAPIs();
    }

    @Override
    public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
        new Config(configs);
    }

    public void setKafkaMetrics(Metrics metrics) {
        this.kafkaMetrics = metrics;
        this.produceConsumeAuditEventRateSensor = this.kafkaMetrics.sensor(PRODUCE_CONSUME_AUDIT_EVENT_RATE_SENSOR);
        this.produceConsumeAuditEventRateSensor.add(this.kafkaMetrics.metricName(PRODUCE_CONSUME_AUDIT_EVENT_RATE, GROUP_NAME, "The number of produce consume audit log per minute"), new Rate());
    }

    static {
        RECONFIGURABLE_CONFIGS.add(ENABLE_DETAILED_AUDIT_LOGGING);
        RECONFIGURABLE_CONFIGS.add(DISABLED_APIS_FOR_AUDIT_LOGS);
    }

    public static class Config
    extends AbstractConfig {
        public static final String ENABLE_DETAILED_AUDIT_LOGGING_CONFIG = "confluent.security.event.logger.enable.detailed.audit.logs";
        public static final String ENABLE_DETAILED_AUDIT_LOGGING_DOC = "config to enable detailed audit logs.This is disabled by default. Note that logging for a specific API can be disabled by `confluent.security.event.logger.detailed.audit.logs.disabled.apis` config.";
        public static final String ENABLE_PRODUCE_CONSUME_AUDIT_LOGGING_CONFIG = "confluent.security.event.logger.enable.produce.consume.audit.logs";
        public static final String ENABLE_PRODUCE_CONSUME_AUDIT_LOGGING_DOC = "config to enable produce/consume audit logs.This is disabled by default. Note that logging for a specific API can be disabled by `confluent.security.event.logger.detailed.audit.logs.disabled.apis` config.";
        public static final String ENABLE_PRODUCE_CONSUME_METRICS_CONFIG = "confluent.security.event.logger.enable.produce.consume.metrics";
        public static final String ENABLE_PRODUCE_CONSUME_METRICS_DOC = "config to enable produce/consume metrics.This is disabled by default. This will also be disable for apis that are present in `confluent.security.event.logger.detailed.audit.logs.disabled.apis` config.";
        public static final String DISABLED_APIS_FOR_AUDIT_LOGS_CONFIG = "confluent.security.event.logger.detailed.audit.logs.disabled.apis";
        public static final String DISABLED_APIS_FOR_AUDIT_LOGS_DOC = "config to disable selected APIs. For example, using `CreateTopics,DeleteTopics` will disable audit events for `CreateTopics` and DeleteTopics` APIs.";
        private static final ConfigDef CONFIG = new ConfigDef().define("confluent.security.event.logger.enable.detailed.audit.logs", ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "config to enable detailed audit logs.This is disabled by default. Note that logging for a specific API can be disabled by `confluent.security.event.logger.detailed.audit.logs.disabled.apis` config.").define("confluent.security.event.logger.enable.produce.consume.audit.logs", ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "config to enable produce/consume audit logs.This is disabled by default. Note that logging for a specific API can be disabled by `confluent.security.event.logger.detailed.audit.logs.disabled.apis` config.").define("confluent.security.event.logger.enable.produce.consume.metrics", ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "config to enable produce/consume metrics.This is disabled by default. This will also be disable for apis that are present in `confluent.security.event.logger.detailed.audit.logs.disabled.apis` config.").define("confluent.security.event.logger.detailed.audit.logs.disabled.apis", ConfigDef.Type.STRING, "", DisabledAPIsConfigValidator.access$000(), ConfigDef.Importance.LOW, "config to disable selected APIs. For example, using `CreateTopics,DeleteTopics` will disable audit events for `CreateTopics` and DeleteTopics` APIs.");

        public Config(Map<?, ?> originals) {
            super(CONFIG, originals, true);
        }

        public boolean enableDetailedAuditLogging() {
            return this.getBoolean("confluent.security.event.logger.enable.detailed.audit.logs");
        }

        public boolean enableProduceConsumeAuditLogging() {
            return this.getBoolean("confluent.security.event.logger.enable.produce.consume.audit.logs");
        }

        public boolean enableProduceConsumeMetrics() {
            return this.getBoolean("confluent.security.event.logger.enable.produce.consume.metrics");
        }

        public Set<ApiKeys> disabledAPIs() {
            return Config.parseDisabledAPIsConfig(this.getString("confluent.security.event.logger.detailed.audit.logs.disabled.apis"));
        }

        private static Set<ApiKeys> parseDisabledAPIsConfig(String configValue) {
            HashSet<ApiKeys> disabledAPIs = new HashSet<ApiKeys>();
            if (configValue == null || configValue.isEmpty()) {
                return disabledAPIs;
            }
            for (String apiKeyString : configValue.split(",")) {
                ApiKeys apiKey = ApiKeys.findByName(apiKeyString);
                if (apiKey == null) {
                    throw new ConfigException("Invalid value `" + apiKeyString + "` found in " + "confluent.security.event.logger.detailed.audit.logs.disabled.apis" + "=`" + configValue + "`");
                }
                disabledAPIs.add(apiKey);
            }
            return disabledAPIs;
        }

        private static class DisabledAPIsConfigValidator
        implements ConfigDef.Validator {
            private static final DisabledAPIsConfigValidator INSTANCE = new DisabledAPIsConfigValidator();

            private DisabledAPIsConfigValidator() {
            }

            @Override
            public void ensureValid(String name, Object value) {
                if (!(value instanceof String)) {
                    throw new ConfigException("Invalid value `" + value + "` found for " + name + " (should be a string)");
                }
                Config.parseDisabledAPIsConfig((String)value);
            }

            static /* synthetic */ DisabledAPIsConfigValidator access$000() {
                return INSTANCE;
            }
        }
    }
}

