/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.metrics;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.regex.Pattern;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.server.metrics.ClientTopicMetricsManager;
import org.apache.kafka.server.quota.SensorAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PlatformClientTopicMetricsManager
implements ClientTopicMetricsManager {
    private static final String CLIENT_ID_TAG = "client-id";
    private static final String GROUP = "ClientBrokerTopicMetrics";
    private static final String KEY_DELIMITER = ":";
    private static final String TOPIC_TAG = "topic";
    private static final Duration LAST_QUOTA_VIOLATION_INFO_LOG_DURATION = Duration.ofHours(1L);
    public static final String CLIENT_BYTES_IN_METRIC_NAME = "ClientBytesIn";
    public static final String CLIENT_BYTES_OUT_METRIC_NAME = "ClientBytesOut";
    public static final String CLIENT_RECORDS_IN_METRIC_NAME = "ClientRecordsIn";
    public static final String CLIENT_RECORDS_OUT_METRIC_NAME = "ClientRecordsOut";
    private volatile LocalDateTime lastQuotaViolationInfoLogTime = LocalDateTime.MIN;
    private final Map<String, SensorGroup> clientTopicSensors = new ConcurrentHashMap<String, SensorGroup>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static final Logger LOG = LoggerFactory.getLogger(PlatformClientTopicMetricsManager.class);
    private long maxClientTopicMetrics;
    private Metrics metrics = new Metrics();
    private SensorAccess sensorAccess;
    private long sensorExpiryTimeSec;
    private Optional<Pattern> topicPatternToIgnore;
    private Optional<Pattern> clientIdPatternToIgnore;
    private static final LongSupplier ZERO_SUPPLIER = () -> 0L;

    public PlatformClientTopicMetricsManager() {
        this.maxClientTopicMetrics = ConfluentConfigs.CLIENT_TOPIC_MAX_METRICS_COUNT_DEFAULT;
        this.sensorExpiryTimeSec = ConfluentConfigs.CLIENT_TOPIC_METRICS_EXPIRY_SEC_DEFAULT;
        this.sensorAccess = new SensorAccess((ReadWriteLock)this.lock, this.metrics);
        this.topicPatternToIgnore = this.getPatternToIgnore("_.*", TOPIC_TAG);
        this.clientIdPatternToIgnore = this.getPatternToIgnore("(?:link-.*-)?broker-\\d+-fetcher-\\d+(?:-pool-.*)?", "clientId");
        LOG.info("Initialized PlatformClientTopicMetricsManager with defaults maxClientTopicConnections:{}, sensorExpiryTimeMs:{}", (Object)this.maxClientTopicMetrics, (Object)this.sensorExpiryTimeSec);
    }

    private Optional<Pattern> getPatternToIgnore(String ignorePattern, String logTag) {
        Optional<Pattern> patternToIgnore;
        if (ignorePattern == null || ignorePattern.isEmpty()) {
            LOG.info("No pattern to ignore for {}.", (Object)logTag);
            patternToIgnore = Optional.empty();
        } else {
            LOG.info("Ignoring pattern '{}' for {}", (Object)ignorePattern, (Object)logTag);
            patternToIgnore = Optional.of(Pattern.compile(ignorePattern));
        }
        return patternToIgnore;
    }

    private <T> Optional<T> getTypeCheckedConfig(Map<String, ?> configs, String key, Class<T> clazz) {
        Object value;
        if (configs.containsKey(key) && clazz.isInstance(value = configs.get(key))) {
            return Optional.of(clazz.cast(value));
        }
        return Optional.empty();
    }

    @Override
    public void configure(Map<String, ?> configs) {
        LOG.info("Configuring PlatformClientTopicMetricsManager with configs {}", configs);
        if (configs == null) {
            throw new IllegalArgumentException("Configs map cannot be null");
        }
        this.metrics = this.getTypeCheckedConfig(configs, "metrics", Metrics.class).orElseThrow(() -> new IllegalArgumentException("Invalid or missing Metrics configuration"));
        this.getTypeCheckedConfig(configs, "client-topic-metrics-max-count", Long.class).ifPresent(value -> {
            this.maxClientTopicMetrics = value;
        });
        this.getTypeCheckedConfig(configs, "client-topic-metrics-expiry-sec", Long.class).ifPresent(value -> {
            this.sensorExpiryTimeSec = value;
        });
        this.getTypeCheckedConfig(configs, "client-topic-metrics-ignore-topic-pattern", String.class).ifPresent(value -> {
            this.topicPatternToIgnore = this.getPatternToIgnore((String)value, TOPIC_TAG);
        });
        this.getTypeCheckedConfig(configs, "client-topic-metrics-ignore-client-id-pattern", String.class).ifPresent(value -> {
            this.clientIdPatternToIgnore = this.getPatternToIgnore((String)value, "clientId");
        });
        this.sensorAccess = new SensorAccess((ReadWriteLock)this.lock, this.metrics);
        LOG.info("Configured PlatformClientTopicMetricsManager with maxClientTopicMetrics: {}, sensorExpiryTimeSec: {}", (Object)this.maxClientTopicMetrics, (Object)this.sensorExpiryTimeSec);
    }

    @Override
    public void recordMetricsIn(String clientId, String topic, long bytesIn, long messagesIn) {
        if (clientId != null && topic != null) {
            this.recordMetrics(clientId, topic, bytesIn, 0L, messagesIn, ZERO_SUPPLIER, Role.PRODUCER);
        }
    }

    @Override
    public void recordMetricsOut(String clientId, String topic, long bytesOut, LongSupplier messagesOut) {
        if (clientId != null && topic != null) {
            this.recordMetrics(clientId, topic, 0L, bytesOut, 0L, messagesOut, Role.CONSUMER);
        }
    }

    private boolean checkPatternOpt(Optional<Pattern> patternOpt, String value) {
        return patternOpt.map(pattern -> pattern.matcher(value).matches()).orElse(false);
    }

    private boolean isIgnoredTopicOrClientId(String clientId, String topic) {
        if (this.checkPatternOpt(this.topicPatternToIgnore, topic)) {
            LOG.debug("Ignoring topic {} for client metrics", (Object)topic);
            return true;
        }
        if (this.checkPatternOpt(this.clientIdPatternToIgnore, clientId)) {
            LOG.debug("Ignoring clientId {} for client metrics", (Object)clientId);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void logQuotaViolationIfTime(String clientId, String topic) {
        LocalDateTime now = LocalDateTime.now();
        if (now.isAfter(this.lastQuotaViolationInfoLogTime.plus(LAST_QUOTA_VIOLATION_INFO_LOG_DURATION))) {
            PlatformClientTopicMetricsManager platformClientTopicMetricsManager = this;
            synchronized (platformClientTopicMetricsManager) {
                if (now.isAfter(this.lastQuotaViolationInfoLogTime.plus(LAST_QUOTA_VIOLATION_INFO_LOG_DURATION))) {
                    LOG.info("Quota violation detected for client {} on topic {}. Max client-topic metrics: {}. Future violation logs suppressed for {} seconds", new Object[]{clientId, topic, this.maxClientTopicMetrics, LAST_QUOTA_VIOLATION_INFO_LOG_DURATION.getSeconds()});
                    this.lastQuotaViolationInfoLogTime = now;
                }
            }
        }
    }

    private void recordMetrics(String clientId, String topic, long bytesIn, long bytesOut, long messagesIn, LongSupplier messagesOut, Role role) {
        if (this.isIgnoredTopicOrClientId(clientId, topic)) {
            return;
        }
        String key = PlatformClientTopicMetricsManager.buildKey(clientId, topic, role);
        try {
            SensorGroup sensorGroup = this.clientTopicSensors.computeIfAbsent(key, k -> {
                if ((long)this.clientTopicSensors.size() >= this.maxClientTopicMetrics) {
                    LOG.debug("Quota violation detected while handling metrics for {}:{}", (Object)clientId, (Object)topic);
                    this.logQuotaViolationIfTime(clientId, topic);
                    return null;
                }
                Consumer<Sensor> cleanUpExpiredSensorGroup = __ -> this.cleanupExpiredSensorGroup(key);
                return this.createSensorGroup(clientId, topic, role, cleanUpExpiredSensorGroup);
            });
            if (sensorGroup == null) {
                LOG.debug("Could not find sensor group for {}:{}", (Object)clientId, (Object)topic);
                return;
            }
            PlatformClientTopicMetricsManager.recordIfValid(sensorGroup.bytesSensor, bytesIn, "bytesIn", clientId, topic);
            PlatformClientTopicMetricsManager.recordIfValid(sensorGroup.bytesSensor, bytesOut, "bytesOut", clientId, topic);
            PlatformClientTopicMetricsManager.recordIfValid(sensorGroup.recordsSensor, messagesIn, "messagesIn", clientId, topic);
            PlatformClientTopicMetricsManager.recordIfValid(sensorGroup.recordsSensor, messagesOut.getAsLong(), "messagesOut", clientId, topic);
        }
        catch (Exception e) {
            LOG.debug("Exception raised while trying to record value for {}:{}", new Object[]{clientId, topic, e});
        }
    }

    private static boolean isValidSensor(Sensor sensor) {
        return sensor != null && !sensor.hasExpired();
    }

    private SensorGroup createSensorGroup(String clientId, String topic, Role role, Consumer<Sensor> onExpiry) {
        Map<String, String> tags = this.getTags(clientId, topic);
        String byteMetricName = role == Role.PRODUCER ? CLIENT_BYTES_IN_METRIC_NAME : CLIENT_BYTES_OUT_METRIC_NAME;
        String recordMetricName = role == Role.PRODUCER ? CLIENT_RECORDS_IN_METRIC_NAME : CLIENT_RECORDS_OUT_METRIC_NAME;
        Sensor clientBytesSensor = this.getOrCreateSensor(byteMetricName, tags, this.sensorExpiryTimeSec, "Count client bytes", onExpiry);
        Sensor clientRecordsSensor = this.getOrCreateSensor(recordMetricName, tags, this.sensorExpiryTimeSec, "Count client records", onExpiry);
        return new SensorGroup(clientBytesSensor, clientRecordsSensor);
    }

    private Sensor getOrCreateSensor(String name, Map<String, String> tags, Long expiryTime, String description, Consumer<Sensor> onExpiry) {
        String sensorName = this.getSensorName(name, tags);
        MetricName metricName = this.metrics.metricName(name, GROUP, description, tags);
        return this.sensorAccess.getOrCreate(sensorName, expiryTime.longValue(), sensor -> sensor.add(metricName, (MeasurableStat)new CumulativeSum()), onExpiry);
    }

    private Map<String, String> getTags(String clientId, String topic) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put(CLIENT_ID_TAG, clientId);
        tags.put(TOPIC_TAG, topic);
        return tags;
    }

    public String getSensorName(String metricName, Map<String, String> metricTags) {
        StringBuilder builder = new StringBuilder(metricName).append("Sensor");
        if (metricTags != null && !metricTags.isEmpty()) {
            metricTags.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> builder.append(KEY_DELIMITER).append((String)entry.getKey()).append("-").append((String)entry.getValue()));
        }
        return builder.toString();
    }

    @Override
    public void close() {
        LOG.info("Shutting down BrokerClientTopicMetrics sensor...");
        this.lock.writeLock().lock();
        ArrayList<SensorGroup> sensorGroups = new ArrayList<SensorGroup>(this.clientTopicSensors.values());
        try {
            sensorGroups.forEach(s -> s.removeAllSensorsFromMetrics(this.metrics));
            this.clientTopicSensors.clear();
        }
        finally {
            this.lock.writeLock().unlock();
        }
        LOG.info("All sensors and metrics have been removed successfully.");
    }

    public Map<String, SensorGroup> getSensorMap() {
        return this.clientTopicSensors;
    }

    public static String buildKey(String clientId, String topic, Role role) {
        return String.join((CharSequence)KEY_DELIMITER, clientId, topic, role.name());
    }

    private static void recordIfValid(Sensor sensor, double value, String metricType, String clientId, String topic) {
        if (!PlatformClientTopicMetricsManager.isValidSensor(sensor)) {
            LOG.debug("Could not record {} since sensor is not valid. {}:{}", new Object[]{metricType, clientId, topic});
        } else if (value > 0.0) {
            sensor.record(value);
        }
    }

    public void cleanupExpiredSensorGroup(String key) {
        this.lock.writeLock().lock();
        try {
            SensorGroup sensorGroup = this.clientTopicSensors.get(key);
            if (sensorGroup != null && sensorGroup.hasAnyExpired()) {
                this.clientTopicSensors.remove(key);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public static enum Role {
        CONSUMER,
        PRODUCER;

    }

    public static class SensorGroup {
        Sensor bytesSensor;
        Sensor recordsSensor;

        public SensorGroup(Sensor bytesSensor, Sensor recordsSensor) {
            this.bytesSensor = bytesSensor;
            this.recordsSensor = recordsSensor;
        }

        private boolean nonNullExpired(Sensor sensor) {
            return sensor != null && sensor.hasExpired();
        }

        public boolean hasAnyExpired() {
            return this.nonNullExpired(this.bytesSensor) || this.nonNullExpired(this.recordsSensor);
        }

        private void removeSensorFromMetrics(Metrics metrics, Sensor sensor) {
            if (sensor != null) {
                metrics.removeSensor(sensor.name());
                LOG.info("Removed sensor {}", (Object)sensor.name());
            }
        }

        public void removeAllSensorsFromMetrics(Metrics metrics) {
            this.removeSensorFromMetrics(metrics, this.bytesSensor);
            this.removeSensorFromMetrics(metrics, this.recordsSensor);
        }
    }
}

