package io.confluent.ksql.execution.streams.metrics;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.math.BigInteger;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollector.class */
public class RocksDBMetricsCollector implements MetricsReporter {
    static final String KSQL_ROCKSDB_METRICS_GROUP = "ksql-rocksdb-aggregates";
    static final String NUMBER_OF_RUNNING_COMPACTIONS = "num-running-compactions";
    static final String BLOCK_CACHE_USAGE = "block-cache-usage";
    static final String BLOCK_CACHE_PINNED_USAGE = "block-cache-pinned-usage";
    static final String ESTIMATE_NUM_KEYS = "estimate-num-keys";
    static final String ESTIMATE_TABLE_READERS_MEM = "estimate-table-readers-mem";
    static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE = "num-entries-active-mem-table";
    static final String NUMBER_OF_DELETES_ACTIVE_MEMTABLE = "num-deletes-active-mem-table";
    static final String NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES = "num-entries-imm-mem-tables";
    static final String NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES = "num-deletes-imm-mem-tables";
    static final String NUMBER_OF_IMMUTABLE_MEMTABLES = "num-immutable-mem-table";
    static final String CURRENT_SIZE_OF_ACTIVE_MEMTABLE = "cur-size-active-mem-table";
    static final String CURRENT_SIZE_OF_ALL_MEMTABLES = "cur-size-all-mem-tables";
    static final String SIZE_OF_ALL_MEMTABLES = "size-all-mem-tables";
    static final String MEMTABLE_FLUSH_PENDING = "mem-table-flush-pending";
    static final String NUMBER_OF_RUNNING_FLUSHES = "num-running-flushes";
    static final String COMPACTION_PENDING = "compaction-pending";
    static final String ESTIMATED_BYTES_OF_PENDING_COMPACTION = "estimate-pending-compaction-bytes";
    static final String TOTAL_SST_FILES_SIZE = "total-sst-files-size";
    static final String LIVE_SST_FILES_SIZE = "live-sst-files-size";
    private Metrics metrics;
    private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBMetricsCollector.class);
    static final String UPDATE_INTERVAL_CONFIG = "ksql.rocksdb.metrics.update.interval.seconds";
    private static final int UPDATE_INTERVAL_DEFAULT = 15;
    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(UPDATE_INTERVAL_CONFIG, ConfigDef.Type.INT, Integer.valueOf(UPDATE_INTERVAL_DEFAULT), ConfigDef.Importance.LOW, "minimum interval between computations of a metric value");
    private static final Object lock = new Object();
    private static Map<String, Collection<AggregatedMetric<?>>> registeredMetrics = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollector$AggregatedMetric.class */
    public static final class AggregatedMetric<T> {
        private final Class<T> clazz;
        private final BinaryOperator<T> aggregator;
        private final T identity;
        private final Interval interval;
        private final Map<MetricName, KafkaMetric> metrics;
        private volatile T value;

        private AggregatedMetric(Class<T> cls, BinaryOperator<T> binaryOperator, T t, Interval interval) {
            this.metrics = new ConcurrentHashMap();
            this.clazz = (Class) Objects.requireNonNull(cls, "clazz");
            this.aggregator = (BinaryOperator) Objects.requireNonNull(binaryOperator, "aggregator");
            this.identity = (T) Objects.requireNonNull(t, "identity");
            this.value = t;
            this.interval = interval;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void add(KafkaMetric kafkaMetric) {
            this.metrics.put(kafkaMetric.metricName(), kafkaMetric);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void remove(MetricName metricName) {
            this.metrics.remove(metricName);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public T getValue() {
            if (this.interval.check()) {
                this.value = update();
            }
            return this.value;
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Multi-variable type inference failed */
        public T update() {
            T t = this.identity;
            for (KafkaMetric kafkaMetric : this.metrics.values()) {
                Object metricValue = kafkaMetric.metricValue();
                if (!this.clazz.isInstance(metricValue)) {
                    RocksDBMetricsCollector.LOGGER.debug("Skipping metric update due to unexpected value type returned by {}", kafkaMetric.metricName().toString());
                    return this.identity;
                }
                t = this.aggregator.apply(t, this.clazz.cast(metricValue));
            }
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/metrics/RocksDBMetricsCollector$Interval.class */
    public static final class Interval {
        private final int intervalSeconds;
        private final AtomicReference<Instant> last;
        private final Supplier<Instant> clock;

        private Interval(int i) {
            this(i, (Supplier<Instant>) Instant::now);
        }

        Interval(int i, Supplier<Instant> supplier) {
            this.intervalSeconds = i;
            this.clock = (Supplier) Objects.requireNonNull(supplier, "clock");
            this.last = new AtomicReference<>(Instant.EPOCH);
        }

        boolean check() {
            return this.last.get().isAfter(this.last.getAndAccumulate(this.clock.get(), (instant, instant2) -> {
                return instant2.isAfter(instant.plusSeconds((long) this.intervalSeconds)) ? instant2 : instant;
            }));
        }
    }

    public void configure(Map<String, ?> map) {
        AbstractConfig abstractConfig = new AbstractConfig(CONFIG_DEF, map);
        this.metrics = (Metrics) Objects.requireNonNull((Metrics) map.get("ksql.internal.metrics"));
        configureShared(abstractConfig, this.metrics);
    }

    public Set<String> reconfigurableConfigs() {
        return Collections.emptySet();
    }

    public void init(List<KafkaMetric> list) {
        list.forEach(this::metricChange);
    }

    public void metricChange(KafkaMetric kafkaMetric) {
        if (kafkaMetric.metricName().group().equals("stream-state-metrics")) {
            metricRemoval(kafkaMetric);
            Collection<AggregatedMetric<?>> collection = registeredMetrics.get(kafkaMetric.metricName().name());
            if (collection == null) {
                return;
            }
            collection.forEach(aggregatedMetric -> {
                aggregatedMetric.add(kafkaMetric);
            });
        }
    }

    public void metricRemoval(KafkaMetric kafkaMetric) {
        Collection<AggregatedMetric<?>> collection;
        MetricName metricName = kafkaMetric.metricName();
        if (metricName.group().equals("stream-state-metrics") && (collection = registeredMetrics.get(metricName.name())) != null) {
            collection.forEach(aggregatedMetric -> {
                aggregatedMetric.remove(metricName);
            });
        }
    }

    @VisibleForTesting
    static void reset() {
        registeredMetrics = null;
    }

    public void close() {
    }

    public static void update() {
        registeredMetrics.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).forEach(obj -> {
            ((AggregatedMetric) obj).update();
        });
    }

    private static void registerBigIntTotal(int i, Map<String, Collection<AggregatedMetric<?>>> map, String str, Metrics metrics) {
        map.putIfAbsent(str, new LinkedList());
        AggregatedMetric<?> aggregatedMetric = new AggregatedMetric<>(BigInteger.class, (v0, v1) -> {
            return v0.add(v1);
        }, BigInteger.ZERO, new Interval(i));
        map.get(str).add(aggregatedMetric);
        metrics.addMetric(metrics.metricName(str + "-total", KSQL_ROCKSDB_METRICS_GROUP), (metricConfig, j) -> {
            return (BigInteger) aggregatedMetric.getValue();
        });
    }

    private static void registerBigIntMax(int i, Map<String, Collection<AggregatedMetric<?>>> map, String str, Metrics metrics) {
        map.putIfAbsent(str, new LinkedList());
        AggregatedMetric<?> aggregatedMetric = new AggregatedMetric<>(BigInteger.class, (v0, v1) -> {
            return v0.max(v1);
        }, BigInteger.ZERO, new Interval(i));
        map.get(str).add(aggregatedMetric);
        metrics.addMetric(metrics.metricName(str + "-max", KSQL_ROCKSDB_METRICS_GROUP), (metricConfig, j) -> {
            return (BigInteger) aggregatedMetric.getValue();
        });
    }

    private static void configureShared(AbstractConfig abstractConfig, Metrics metrics) {
        synchronized (lock) {
            if (registeredMetrics != null) {
                return;
            }
            int intValue = abstractConfig.getInt(UPDATE_INTERVAL_CONFIG).intValue();
            HashMap hashMap = new HashMap();
            registerAll(intValue, hashMap, metrics);
            registeredMetrics = ImmutableMap.copyOf((Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return ImmutableList.copyOf((Collection) entry.getValue());
            })));
        }
    }

    private static void registerAll(int i, Map<String, Collection<AggregatedMetric<?>>> map, Metrics metrics) {
        registerBigIntTotal(i, map, NUMBER_OF_RUNNING_COMPACTIONS, metrics);
        registerBigIntTotal(i, map, BLOCK_CACHE_USAGE, metrics);
        registerBigIntMax(i, map, BLOCK_CACHE_USAGE, metrics);
        registerBigIntTotal(i, map, BLOCK_CACHE_PINNED_USAGE, metrics);
        registerBigIntMax(i, map, BLOCK_CACHE_PINNED_USAGE, metrics);
        registerBigIntTotal(i, map, ESTIMATE_NUM_KEYS, metrics);
        registerBigIntTotal(i, map, ESTIMATE_TABLE_READERS_MEM, metrics);
        registerBigIntTotal(i, map, NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE, metrics);
        registerBigIntTotal(i, map, NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES, metrics);
        registerBigIntTotal(i, map, NUMBER_OF_DELETES_ACTIVE_MEMTABLE, metrics);
        registerBigIntTotal(i, map, NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES, metrics);
        registerBigIntTotal(i, map, NUMBER_OF_IMMUTABLE_MEMTABLES, metrics);
        registerBigIntTotal(i, map, CURRENT_SIZE_OF_ACTIVE_MEMTABLE, metrics);
        registerBigIntTotal(i, map, CURRENT_SIZE_OF_ALL_MEMTABLES, metrics);
        registerBigIntTotal(i, map, SIZE_OF_ALL_MEMTABLES, metrics);
        registerBigIntTotal(i, map, MEMTABLE_FLUSH_PENDING, metrics);
        registerBigIntTotal(i, map, NUMBER_OF_RUNNING_FLUSHES, metrics);
        registerBigIntTotal(i, map, COMPACTION_PENDING, metrics);
        registerBigIntTotal(i, map, ESTIMATED_BYTES_OF_PENDING_COMPACTION, metrics);
        registerBigIntTotal(i, map, TOTAL_SST_FILES_SIZE, metrics);
        registerBigIntTotal(i, map, LIVE_SST_FILES_SIZE, metrics);
    }
}
