/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.requests;

import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestLogFilter;
import org.apache.kafka.common.utils.AdaptiveSampler;
import org.apache.kafka.common.utils.LogAction;
import org.apache.kafka.common.utils.NoOpSampler;
import org.apache.kafka.common.utils.SlowLogAction;
import org.apache.kafka.common.utils.TimeBasedSampler;

public class SamplingRequestLogFilter
implements RequestLogFilter {
    public static final String ENABLE_ADMIN_API_LOGGING = "confluent.request.log.enable.admin.apis";
    public static final String DEFAULT_SAMPLES_PER_MIN = "confluent.request.log.samples.per.min";
    public static final String OVERRIDE_API_SAMPLES_PER_MIN = "confluent.request.log.api.samples.per.min";
    public static final String ENABLE_SLOW_LOGGING = "confluent.request.log.enable.slowlog";
    public static final String SLOW_LOG_THRESHOLD_OVERRIDE = "confluent.request.slowlog.threshold.override";
    public static final String MIN_P99_SLOW_LOG_THRESHOLD = "confluent.request.slowlog.threshold.p99.min";
    private static final EnumSet<ApiKeys> ADMIN_APIS = EnumSet.of(ApiKeys.CREATE_TOPICS, new ApiKeys[]{ApiKeys.CREATE_PARTITIONS, ApiKeys.DELETE_TOPICS, ApiKeys.CREATE_ACLS, ApiKeys.DELETE_ACLS, ApiKeys.ALTER_CONFIGS, ApiKeys.INCREMENTAL_ALTER_CONFIGS, ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.CREATE_CLUSTER_LINKS, ApiKeys.DELETE_CLUSTER_LINKS, ApiKeys.ALTER_MIRRORS, ApiKeys.INITIATE_SHUTDOWN, ApiKeys.REMOVE_BROKERS, ApiKeys.ALTER_PARTITION_REASSIGNMENTS, ApiKeys.ALTER_REPLICA_LOG_DIRS, ApiKeys.UPDATE_FEATURES, ApiKeys.DELETE_GROUPS, ApiKeys.ALTER_BROKER_HEALTH, ApiKeys.ALTER_BROKER_REPLICA_EXCLUSIONS, ApiKeys.TRIGGER_EVEN_CLUSTER_LOAD});
    private static final EnumSet<ApiKeys> SLOW_LOG_APIS = EnumSet.of(ApiKeys.PRODUCE);
    private static final Set<String> RECONFIGURABLE_CONFIGS = new HashSet<String>();
    private volatile boolean enableAllAdminApiLogging = true;
    private volatile TimeBasedSampler defaultSampler = null;
    private volatile EnumMap<ApiKeys, TimeBasedSampler> apiSamplers = new EnumMap(ApiKeys.class);
    private volatile boolean enableSlowLogging = true;
    private volatile Double slowLogThresholdOverride = 0.0;
    private volatile Double minP99SlowLogThreshold = 0.0;
    private volatile TimeBasedSampler slowLogSampler = null;

    @Override
    public LogAction processRequest(RequestContext ctx, long requestTimeNanos) {
        LogAction action;
        TimeBasedSampler apiSampler = this.apiSamplers.get((Object)ctx.header.apiKey());
        if (apiSampler != null) {
            return apiSampler.maybeSample(requestTimeNanos);
        }
        if (this.defaultSampler != null && (action = this.defaultSampler.maybeSample(requestTimeNanos)).shouldLog()) {
            return action;
        }
        if (this.enableAllAdminApiLogging && ADMIN_APIS.contains((Object)ctx.header.apiKey())) {
            return LogAction.LOGGED;
        }
        if (this.enableSlowLogging && SLOW_LOG_APIS.contains((Object)ctx.header.apiKey())) {
            return new SlowLogAction(this.slowLogSampler, this.slowLogThresholdOverride, this.minP99SlowLogThreshold);
        }
        return LogAction.NOT_LOGGED;
    }

    @Override
    public Set<String> reconfigurableConfigs() {
        return RECONFIGURABLE_CONFIGS;
    }

    @Override
    public void reconfigure(Map<String, ?> configs) {
        this.configure(configs);
    }

    @Override
    public void configure(Map<String, ?> configs) {
        Config config = new Config(configs);
        this.defaultSampler = this.buildDefaultSampler(config);
        this.apiSamplers = this.buildApiSamplers(config);
        this.enableAllAdminApiLogging = config.enableAdminApiLogging();
        this.enableSlowLogging = config.enableSlowLogging();
        this.slowLogThresholdOverride = config.slowLogThresholdOverride();
        this.minP99SlowLogThreshold = config.minP99SlowLogThreshold();
        this.slowLogSampler = SamplingRequestLogFilter.buildSlowLogSampler(config);
    }

    @Override
    public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
        new Config(configs);
    }

    private EnumMap<ApiKeys, TimeBasedSampler> buildApiSamplers(Config config) {
        EnumMap<ApiKeys, TimeBasedSampler> apiIntervals = new EnumMap<ApiKeys, TimeBasedSampler>(ApiKeys.class);
        config.overrideApiSamplesPerMin().forEach((apiKey, samplesPerMin) -> apiIntervals.put((ApiKeys)((Object)apiKey), SamplingRequestLogFilter.perMinuteSampler(false, samplesPerMin)));
        return apiIntervals;
    }

    private TimeBasedSampler buildDefaultSampler(Config config) {
        long defaultSamplesPerMin = config.defaultSamplesPerMin();
        if (defaultSamplesPerMin == 0L) {
            return null;
        }
        return SamplingRequestLogFilter.perMinuteSampler(true, defaultSamplesPerMin);
    }

    private static TimeBasedSampler buildSlowLogSampler(Config config) {
        long samplesPerMin = config.defaultSamplesPerMin();
        return SamplingRequestLogFilter.perMinuteSampler(false, samplesPerMin);
    }

    private static TimeBasedSampler perMinuteSampler(boolean isDefaultSampler, long samplesPerMin) {
        if (samplesPerMin == 0L) {
            return new NoOpSampler();
        }
        return new AdaptiveSampler(isDefaultSampler, samplesPerMin);
    }

    static {
        RECONFIGURABLE_CONFIGS.add(ENABLE_ADMIN_API_LOGGING);
        RECONFIGURABLE_CONFIGS.add(DEFAULT_SAMPLES_PER_MIN);
        RECONFIGURABLE_CONFIGS.add(OVERRIDE_API_SAMPLES_PER_MIN);
        RECONFIGURABLE_CONFIGS.add(ENABLE_SLOW_LOGGING);
        RECONFIGURABLE_CONFIGS.add(SLOW_LOG_THRESHOLD_OVERRIDE);
        RECONFIGURABLE_CONFIGS.add(MIN_P99_SLOW_LOG_THRESHOLD);
    }

    public static class Config
    extends AbstractConfig {
        public static final String ENABLE_ADMIN_API_LOGGING_CONFIG = "confluent.request.log.enable.admin.apis";
        public static final String ENABLE_ADMIN_API_LOGGING_DOC = "Whether to log all admin api requests (e.g. topic creation/deletion or ACL/config changes) without regard to sampling rate. These APIs typically have a low request rate which makes them unlikely to get sampled, but a high impact on the system behavior. This is enabled by default. Note that logging for a specific API can be overridden by `confluent.request.log.api.samples.per.min`. In particular, this can be used to disable logging for a specific admin API if it has gotten too noisy.";
        public static final String DEFAULT_SAMPLES_PER_MIN_CONFIG = "confluent.request.log.samples.per.min";
        public static final String DEFAULT_SAMPLES_PER_MIN_DOC = "The default sampling rate for all APIs which are not overridden by `confluent.request.log.api.samples.per.min` or `confluent.request.log.enable.admin.apis`. This rate is applied independently on each request handler thread, so you must take into Kafka `num.io.threads` for the overall sampling rate.";
        public static final String OVERRIDE_API_SAMPLES_PER_MIN_CONFIG = "confluent.request.log.api.samples.per.min";
        public static final String OVERRIDE_API_SAMPLES_PER_MIN_DOC = "The override sampling rate for select APIs. For example, using `Produce=15,Fetch=30` would result in the `Produce` API being sampled at a rate of 15 requests/sec while `Fetch` would be sampled at 30 requests/sec, both independent of the default sampling rate. This can be used to disable logging for specific APIs, including those enabled by `confluent.request.log.enable.admin.apis`.";
        public static final String SLOW_LOG_ENABLE_DOC = "Enable slow request logs. All produce and fetch requests, where the total time strictly exceeds the P99 total time for the request will be logged.";
        public static final String SLOW_LOG_OVERRIDE_THRESHOLD_DOC = "The override sampling threshold for slow log. The default value is set to be p99, can be overridden by the value user passed in.";
        public static final String MIN_P99_SLOW_LOG_THRESHOLD_DOC = "The minimum p99 for slow log threshold. When this value is defined, we take the maximum value of it and p99 for slow log sampling.";
        private static final ConfigDef CONFIG = new ConfigDef().define("confluent.request.log.enable.admin.apis", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, "Whether to log all admin api requests (e.g. topic creation/deletion or ACL/config changes) without regard to sampling rate. These APIs typically have a low request rate which makes them unlikely to get sampled, but a high impact on the system behavior. This is enabled by default. Note that logging for a specific API can be overridden by `confluent.request.log.api.samples.per.min`. In particular, this can be used to disable logging for a specific admin API if it has gotten too noisy.").define("confluent.request.log.samples.per.min", ConfigDef.Type.LONG, (Object)0, ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, "The default sampling rate for all APIs which are not overridden by `confluent.request.log.api.samples.per.min` or `confluent.request.log.enable.admin.apis`. This rate is applied independently on each request handler thread, so you must take into Kafka `num.io.threads` for the overall sampling rate.").define("confluent.request.log.api.samples.per.min", ConfigDef.Type.STRING, (Object)"", OverrideApiSamplingValidator.access$000(), ConfigDef.Importance.LOW, "The override sampling rate for select APIs. For example, using `Produce=15,Fetch=30` would result in the `Produce` API being sampled at a rate of 15 requests/sec while `Fetch` would be sampled at 30 requests/sec, both independent of the default sampling rate. This can be used to disable logging for specific APIs, including those enabled by `confluent.request.log.enable.admin.apis`.").define("confluent.request.log.enable.slowlog", ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "Enable slow request logs. All produce and fetch requests, where the total time strictly exceeds the P99 total time for the request will be logged.").define("confluent.request.slowlog.threshold.override", ConfigDef.Type.DOUBLE, -1, ConfigDef.Importance.LOW, "The override sampling threshold for slow log. The default value is set to be p99, can be overridden by the value user passed in.").define("confluent.request.slowlog.threshold.p99.min", ConfigDef.Type.DOUBLE, -1, ConfigDef.Importance.LOW, "The minimum p99 for slow log threshold. When this value is defined, we take the maximum value of it and p99 for slow log sampling.");

        public Config(Map<?, ?> originals) {
            super(CONFIG, originals, true);
        }

        public boolean enableAdminApiLogging() {
            return this.getBoolean("confluent.request.log.enable.admin.apis");
        }

        public boolean enableSlowLogging() {
            return this.getBoolean(SamplingRequestLogFilter.ENABLE_SLOW_LOGGING);
        }

        public long defaultSamplesPerMin() {
            return this.getLong("confluent.request.log.samples.per.min");
        }

        public Double slowLogThresholdOverride() {
            return this.getDouble(SamplingRequestLogFilter.SLOW_LOG_THRESHOLD_OVERRIDE);
        }

        public Double minP99SlowLogThreshold() {
            return this.getDouble(SamplingRequestLogFilter.MIN_P99_SLOW_LOG_THRESHOLD);
        }

        public EnumMap<ApiKeys, Long> overrideApiSamplesPerMin() {
            return Config.parseOverrideApiSamplesPerMin(this.getString("confluent.request.log.api.samples.per.min"));
        }

        private static EnumMap<ApiKeys, Long> parseOverrideApiSamplesPerMin(String configValue) {
            EnumMap<ApiKeys, Long> apiOverrides = new EnumMap<ApiKeys, Long>(ApiKeys.class);
            if (configValue == null || configValue.isEmpty()) {
                return apiOverrides;
            }
            for (String apiSamplePairString : configValue.split(",")) {
                String[] apiSamplePair = apiSamplePairString.split("=");
                if (apiSamplePair.length != 2) {
                    throw new ConfigException("Invalid value `" + apiSamplePairString + "` found in " + "confluent.request.log.api.samples.per.min" + "=`" + configValue + "`");
                }
                ApiKeys apiKey = ApiKeys.findByName(apiSamplePair[0]);
                if (apiKey == null) {
                    throw new ConfigException("Invalid value `" + apiSamplePair[0] + "` found in " + "confluent.request.log.api.samples.per.min" + "=`" + configValue + "`");
                }
                try {
                    long samplesPerMin = Long.parseLong(apiSamplePair[1]);
                    if (samplesPerMin < 0L) {
                        throw new ConfigException("Invalid negative value `" + samplesPerMin + "` found in " + "confluent.request.log.api.samples.per.min" + "=`" + configValue + "`");
                    }
                    apiOverrides.put(apiKey, samplesPerMin);
                }
                catch (NumberFormatException e) {
                    throw new ConfigException("Invalid value `" + apiSamplePair[1] + "` found in " + "confluent.request.log.api.samples.per.min" + "=`" + configValue + "`");
                }
            }
            return apiOverrides;
        }

        private static class OverrideApiSamplingValidator
        implements ConfigDef.Validator {
            private static final OverrideApiSamplingValidator INSTANCE = new OverrideApiSamplingValidator();

            private OverrideApiSamplingValidator() {
            }

            @Override
            public void ensureValid(String name, Object value) {
                if (!(value instanceof String)) {
                    throw new ConfigException("Invalid value `" + value + "` found for " + name + " (should be a string)");
                }
                Config.parseOverrideApiSamplesPerMin((String)value);
            }

            static /* synthetic */ OverrideApiSamplingValidator access$000() {
                return INSTANCE;
            }
        }
    }
}

