package org.apache.kafka.server.metrics;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.server.quota.SensorAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/server/metrics/PlatformClientTopicMetricsManager.class */
public class PlatformClientTopicMetricsManager implements ClientTopicMetricsManager {
    private static final String CLIENT_ID_TAG = "client-id";
    private static final String GROUP = "ClientBrokerTopicMetrics";
    private static final String KEY_DELIMITER = ":";
    private static final String TOPIC_TAG = "topic";
    public static final String CLIENT_BYTES_IN_METRIC_NAME = "ClientBytesIn";
    public static final String CLIENT_BYTES_OUT_METRIC_NAME = "ClientBytesOut";
    public static final String CLIENT_RECORDS_IN_METRIC_NAME = "ClientRecordsIn";
    public static final String CLIENT_RECORDS_OUT_METRIC_NAME = "ClientRecordsOut";
    private static final Logger LOG = LoggerFactory.getLogger(PlatformClientTopicMetricsManager.class);
    private SensorAccess sensorAccess;
    private final Map<String, SensorGroup> clientTopicSensors = new ConcurrentHashMap();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private Metrics metrics = new Metrics();
    private long maxClientTopicMetrics = ConfluentConfigs.CLIENT_TOPIC_MAX_METRICS_COUNT_DEFAULT.longValue();
    private long sensorExpiryTimeSec = ConfluentConfigs.CLIENT_TOPIC_METRICS_EXPIRY_SEC_DEFAULT.longValue();
    private final ScheduledThreadPoolExecutor expiredSensorCleanupTaskScheduler = new ScheduledThreadPoolExecutor(1);

    /* loaded from: input_file:org/apache/kafka/server/metrics/PlatformClientTopicMetricsManager$ExpiredSensorCleanupTask.class */
    class ExpiredSensorCleanupTask implements Runnable {
        ExpiredSensorCleanupTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                StringBuilder sb = new StringBuilder("Expired sensors found: ");
                PlatformClientTopicMetricsManager.this.clientTopicSensors.entrySet().removeIf(entry -> {
                    SensorGroup sensorGroup = (SensorGroup) entry.getValue();
                    boolean z = sensorGroup.bytesSensor.hasExpired() || sensorGroup.recordsSensor.hasExpired();
                    if (z) {
                        sb.append(String.format("[%s, %s], ", sensorGroup.bytesSensor.name(), sensorGroup.recordsSensor.name()));
                    }
                    return z;
                });
                if (sb.length() > "Expired sensors found: ".length()) {
                    PlatformClientTopicMetricsManager.LOG.info(sb.toString());
                }
            } catch (Exception e) {
                PlatformClientTopicMetricsManager.LOG.debug("ExpiredSensorCleanupTask encountered an error: ", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/metrics/PlatformClientTopicMetricsManager$Role.class */
    public enum Role {
        CONSUMER,
        PRODUCER
    }

    /* loaded from: input_file:org/apache/kafka/server/metrics/PlatformClientTopicMetricsManager$SensorGroup.class */
    public static class SensorGroup {
        Sensor bytesSensor;
        Sensor recordsSensor;

        public SensorGroup(Sensor sensor, Sensor sensor2) {
            this.bytesSensor = sensor;
            this.recordsSensor = sensor2;
        }
    }

    public PlatformClientTopicMetricsManager() {
        this.expiredSensorCleanupTaskScheduler.setThreadFactory(runnable -> {
            return KafkaThread.daemon("ClientTopicMetricsManager-ExpiredSensorCleanupTaskThread", runnable);
        });
        this.expiredSensorCleanupTaskScheduler.scheduleAtFixedRate(new ExpiredSensorCleanupTask(), 30L, 30L, TimeUnit.SECONDS);
        LOG.info("Initialized BrokerClientTopicMetricsManager with maxClientTopicConnections:{}, sensorExpiryTimeMs:{}", Long.valueOf(this.maxClientTopicMetrics), Long.valueOf(this.sensorExpiryTimeSec));
        this.sensorAccess = new SensorAccess(this.lock, this.metrics);
    }

    @Override // org.apache.kafka.server.metrics.ClientTopicMetricsManager
    public void configure(Map<String, ?> map) {
        if (map == null) {
            throw new IllegalArgumentException("Configs map cannot be null");
        }
        if (!map.containsKey("metrics") || !(map.get("metrics") instanceof Metrics)) {
            throw new IllegalArgumentException("Invalid or missing Metrics configuration");
        }
        this.metrics = (Metrics) map.get("metrics");
        if (map.containsKey(ClientTopicMetricsManager.CLIENT_TOPIC_METRICS_MAX_COUNT) && (map.get(ClientTopicMetricsManager.CLIENT_TOPIC_METRICS_MAX_COUNT) instanceof Long)) {
            this.maxClientTopicMetrics = ((Long) map.get(ClientTopicMetricsManager.CLIENT_TOPIC_METRICS_MAX_COUNT)).longValue();
        }
        if (map.containsKey(ClientTopicMetricsManager.CLIENT_TOPIC_METRICS_EXPIRY_SEC) && (map.get(ClientTopicMetricsManager.CLIENT_TOPIC_METRICS_EXPIRY_SEC) instanceof Long)) {
            this.sensorExpiryTimeSec = ((Long) map.get(ClientTopicMetricsManager.CLIENT_TOPIC_METRICS_EXPIRY_SEC)).longValue();
        }
        this.sensorAccess = new SensorAccess(this.lock, this.metrics);
    }

    @Override // org.apache.kafka.server.metrics.ClientTopicMetricsManager
    public void recordMetricsIn(String str, String str2, long j, long j2) {
        if (str == null || str2 == null) {
            return;
        }
        recordMetrics(str, str2, j, 0L, j2, 0L, Role.PRODUCER);
    }

    @Override // org.apache.kafka.server.metrics.ClientTopicMetricsManager
    public void recordMetricsOut(String str, String str2, long j, Supplier<Long> supplier) {
        if (str == null || str2 == null) {
            return;
        }
        recordMetrics(str, str2, 0L, j, 0L, supplier.get().longValue(), Role.CONSUMER);
    }

    private void recordMetrics(String str, String str2, long j, long j2, long j3, long j4, Role role) {
        try {
            SensorGroup computeIfAbsent = this.clientTopicSensors.computeIfAbsent(buildKey(str, str2, role), str3 -> {
                if (this.clientTopicSensors.size() < this.maxClientTopicMetrics) {
                    return getSensorGroup(str, str2, role);
                }
                LOG.debug("Quota violation detected while handling metrics for {}:{}", str, str2);
                return null;
            });
            if (computeIfAbsent == null) {
                LOG.debug("Could not find sensor group for {}:{}", str, str2);
                return;
            }
            recordIfValid(computeIfAbsent.bytesSensor, j, "bytesIn", str, str2);
            recordIfValid(computeIfAbsent.bytesSensor, j2, "bytesOut", str, str2);
            recordIfValid(computeIfAbsent.recordsSensor, j3, "messagesIn", str, str2);
            recordIfValid(computeIfAbsent.recordsSensor, j4, "messagesOut", str, str2);
        } catch (Exception e) {
            LOG.debug("Exception raised while trying to record value for {}:{}", new Object[]{str, str2, e});
        }
    }

    private static boolean isValidSensor(Sensor sensor) {
        return (sensor == null || sensor.hasExpired()) ? false : true;
    }

    private SensorGroup getSensorGroup(String str, String str2, Role role) {
        Map<String, String> tags = getTags(str, str2);
        return new SensorGroup(getSensor(role == Role.PRODUCER ? CLIENT_BYTES_IN_METRIC_NAME : CLIENT_BYTES_OUT_METRIC_NAME, tags, Long.valueOf(this.sensorExpiryTimeSec), "Count client bytes"), getSensor(role == Role.PRODUCER ? CLIENT_RECORDS_IN_METRIC_NAME : CLIENT_RECORDS_OUT_METRIC_NAME, tags, Long.valueOf(this.sensorExpiryTimeSec), "Count client records"));
    }

    private Sensor getSensor(String str, Map<String, String> map, Long l, String str2) {
        String sensorName = getSensorName(str, map);
        MetricName metricName = this.metrics.metricName(str, GROUP, str2, map);
        return this.sensorAccess.getOrCreate(sensorName, l.longValue(), sensor -> {
            sensor.add(metricName, new CumulativeSum());
        });
    }

    private Map<String, String> getTags(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put(CLIENT_ID_TAG, str);
        hashMap.put(TOPIC_TAG, str2);
        return hashMap;
    }

    public String getSensorName(String str, Map<String, String> map) {
        StringBuilder append = new StringBuilder(str).append("Sensor");
        if (map != null && !map.isEmpty()) {
            map.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
                append.append(KEY_DELIMITER).append((String) entry.getKey()).append("-").append((String) entry.getValue());
            });
        }
        return append.toString();
    }

    @Override // org.apache.kafka.server.metrics.ClientTopicMetricsManager, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.expiredSensorCleanupTaskScheduler.shutdown();
        try {
            if (!this.expiredSensorCleanupTaskScheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.expiredSensorCleanupTaskScheduler.shutdownNow();
            }
            LOG.info("Shutting down scheduler");
        } catch (InterruptedException e) {
            this.expiredSensorCleanupTaskScheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
        LOG.info("Shutting down BrokerClientTopicMetrics sensor...");
        this.clientTopicSensors.values().forEach(sensorGroup -> {
            if (sensorGroup.bytesSensor != null) {
                this.metrics.removeSensor(sensorGroup.bytesSensor.name());
                LOG.info("Removed sensor {}", sensorGroup.bytesSensor.name());
            }
            if (sensorGroup.recordsSensor != null) {
                this.metrics.removeSensor(sensorGroup.recordsSensor.name());
                LOG.info("Removed sensor {}", sensorGroup.recordsSensor.name());
            }
        });
        this.clientTopicSensors.clear();
        LOG.info("All sensors and metrics have been removed successfully.");
    }

    public Map<String, SensorGroup> getSensorMap() {
        return this.clientTopicSensors;
    }

    public boolean isSchedulerShutdown() {
        return this.expiredSensorCleanupTaskScheduler.isShutdown();
    }

    public static String buildKey(String str, String str2, Role role) {
        return String.join(KEY_DELIMITER, str, str2, role.name());
    }

    private static void recordIfValid(Sensor sensor, double d, String str, String str2, String str3) {
        if (!isValidSensor(sensor)) {
            LOG.debug("Could not record {} since sensor is not valid. {}:{}", new Object[]{str, str2, str3});
        } else if (d > 0.0d) {
            sensor.record(d);
        }
    }
}
