/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.cruisecontrol.metricsreporter;

import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.BrokerMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.PartitionMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.TopicMetric;
import io.confluent.cruisecontrol.metricsreporter.ConfluentMetricsSamplerBase;
import io.confluent.cruisecontrol.metricsreporter.OpenCensusConverter;
import io.confluent.cruisecontrol.metricsreporter.OpenTelemetryConverter;
import io.confluent.cruisecontrol.metricsreporter.TelemetryConverter;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;

public class ConfluentTelemetryReporterSampler
extends ConfluentMetricsSamplerBase {
    static final String TOPIC_KEY = "topic";
    static final String PARTITION_KEY = "partition";
    static final String BROKER_KEY = "kafka.broker.id";
    static final String REQUEST_TYPE_KEY = "request";
    static final String PRODUCE_REQUEST_TYPE = "Produce";
    static final String CONSUMER_FETCH_REQUEST_TYPE = "FetchConsumer";
    static final String FOLLOWER_FETCH_REQUEST_TYPE = "FetchFollower";
    static final String BYTES_IN_PER_SEC = "io.confluent.kafka.server/broker_topic/bytes_in/rate/1_min";
    static final String BYTES_OUT_PER_SEC = "io.confluent.kafka.server/broker_topic/bytes_out/rate/1_min";
    static final String REPLICATION_BYTES_IN_PER_SEC = "io.confluent.kafka.server/broker_topic/replication_bytes_in/rate/1_min";
    static final String REPLICATION_BYTES_OUT_PER_SEC = "io.confluent.kafka.server/broker_topic/replication_bytes_out/rate/1_min";
    static final String TOTAL_FETCH_REQUEST_PER_SEC = "io.confluent.kafka.server/broker_topic/total_fetch_requests/rate/1_min";
    static final String TOTAL_FOLLOWER_FETCH_REQUEST_PER_SEC = "io.confluent.kafka.server/broker_topic/total_follower_fetch_requests/rate/1_min";
    static final String TOTAL_PRODUCE_REQUEST_PER_SEC = "io.confluent.kafka.server/broker_topic/total_produce_requests/rate/1_min";
    static final String MESSAGES_IN_PER_SEC = "io.confluent.kafka.server/broker_topic/messages_in/rate/1_min";
    static final String REQUESTS_PER_SEC = "io.confluent.kafka.server/request/requests/rate/1_min";
    static final String REQUEST_QUEUE_SIZE = "io.confluent.kafka.server/request_channel/request_queue_size";
    static final String RESPONSE_QUEUE_SIZE = "io.confluent.kafka.server/request_channel/response_queue_size";
    static final String REQUEST_QUEUE_TIME_MS = "io.confluent.kafka.server/request/request_queue_time_ms";
    static final String LOCAL_TIME_MS = "io.confluent.kafka.server/request/local_time_ms";
    static final String TOTAL_TIME_MS = "io.confluent.kafka.server/request/total_time_ms";
    static final String SIZE = "io.confluent.kafka.server/log/size";
    static final String LOG_FLUSH_RATE = "io.confluent.kafka.server/log_flush/log_flush_rate_and_time_ms/rate/1_min";
    static final String LOG_FLUSH_TIME_MS = "io.confluent.kafka.server/log_flush/log_flush_rate_and_time_ms";
    static final String LINUX_CPU_USAGE = "io.confluent.kafka.server/server/linux_system_cpu_utilization";
    static final String JVM_OS_JMX_BEAN_CPU_USAGE = "io.confluent.system/jvm/os/process_cpu_load";
    static final String DISK_TOTAL_BYTES = "io.confluent.system/volume/disk_total_bytes";
    private static final Map<String, TopicAndAllTopicMetricTypes> TOPIC_METRIC_MAP = ConfluentTelemetryReporterSampler.buildTopicMetricMap();
    private static final Map<String, TopicAndAllTopicMetricTypes> TOPIC_METRIC_MAP_FOR_FFF = ConfluentTelemetryReporterSampler.buildTopicMetricMapForFetchFromFollower();
    private static final Map<String, Map<String, TimerMetricTypes>> REQUEST_TIMER_METRIC_MAP = ConfluentTelemetryReporterSampler.buildRequestTimerMetricMap();
    private static final TimerMetricTypes LOG_FLUSH_TIMER_METRIC_TYPES = new TimerMetricTypes(RawMetricType.BROKER_LOG_FLUSH_TIME_MS_50TH, RawMetricType.BROKER_LOG_FLUSH_TIME_MS_999TH, RawMetricType.BROKER_LOG_FLUSH_TIME_MS_MAX, RawMetricType.BROKER_LOG_FLUSH_TIME_MS_MEAN);
    private static final OpenCensusConverter OPENCENSUS_CONVERTER = new OpenCensusConverter();
    private static final OpenTelemetryConverter OPENTELEMETRY_CONVERTER = new OpenTelemetryConverter();

    private static int telemetryMessageVersion(ConsumerRecord<byte[], byte[]> record) {
        Header versionHeader = record.headers().lastHeader("v");
        if (versionHeader != null) {
            return ByteBuffer.wrap(versionHeader.value()).order(ByteOrder.LITTLE_ENDIAN).getInt();
        }
        return 0;
    }

    @Override
    protected List<CruiseControlMetric> convertMetricRecord(ConsumerRecord<byte[], byte[]> record) {
        TelemetryConverter telemetryConverter;
        switch (ConfluentTelemetryReporterSampler.telemetryMessageVersion(record)) {
            case 0: {
                telemetryConverter = OPENCENSUS_CONVERTER;
                break;
            }
            case 1: {
                telemetryConverter = OPENTELEMETRY_CONVERTER;
                break;
            }
            default: {
                return Collections.emptyList();
            }
        }
        final ArrayList<CruiseControlMetric> ccMetrics = new ArrayList<CruiseControlMetric>();
        telemetryConverter.convert((byte[])record.value(), new TelemetryConverter.MetricConsumer<Integer>(){

            @Override
            public Optional<Integer> mapResourceLabels(Map<String, String> resourceLabels) {
                String brokerLabel = resourceLabels.getOrDefault(ConfluentTelemetryReporterSampler.BROKER_KEY, "");
                if (brokerLabel.isEmpty()) {
                    return Optional.empty();
                }
                return Optional.of(Integer.parseInt(brokerLabel));
            }

            @Override
            public void consume(String metricName, Integer brokerId, TelemetryConverter.DataPoint dataPoint) {
                ConfluentTelemetryReporterSampler.createCruiseControlPointMetrics(metricName, brokerId, dataPoint, ccMetrics);
            }

            @Override
            public void consume(String metricName, Integer brokerId, TelemetryConverter.Summary summary) {
                ConfluentTelemetryReporterSampler.createCruiseControlSummaryMetrics(metricName, brokerId, summary, ccMetrics);
            }
        });
        return ccMetrics;
    }

    private static void createCruiseControlPointMetrics(String name, int brokerId, TelemetryConverter.DataPoint point, List<CruiseControlMetric> ccMetrics) {
        String topic = point.labels().get(TOPIC_KEY);
        long timestamp = point.timestamp();
        switch (name) {
            case "io.confluent.kafka.server/broker_topic/bytes_out/rate/1_min": 
            case "io.confluent.kafka.server/broker_topic/total_fetch_requests/rate/1_min": 
            case "io.confluent.kafka.server/broker_topic/bytes_in/rate/1_min": 
            case "io.confluent.kafka.server/broker_topic/replication_bytes_in/rate/1_min": 
            case "io.confluent.kafka.server/broker_topic/replication_bytes_out/rate/1_min": 
            case "io.confluent.kafka.server/broker_topic/total_follower_fetch_requests/rate/1_min": 
            case "io.confluent.kafka.server/broker_topic/total_produce_requests/rate/1_min": 
            case "io.confluent.kafka.server/broker_topic/messages_in/rate/1_min": {
                ccMetrics.add(ConfluentTelemetryReporterSampler.buildTopicOrAllTopicMetric(name, topic, timestamp, brokerId, point.asDouble()));
                break;
            }
            case "io.confluent.kafka.server/request/requests/rate/1_min": {
                switch (point.labels().get(REQUEST_TYPE_KEY)) {
                    case "Produce": {
                        ccMetrics.add(new BrokerMetric(RawMetricType.BROKER_PRODUCE_REQUEST_RATE, timestamp, brokerId, point.asDouble()));
                        break;
                    }
                    case "FetchConsumer": {
                        ccMetrics.add(new BrokerMetric(RawMetricType.BROKER_CONSUMER_FETCH_REQUEST_RATE, timestamp, brokerId, point.asDouble()));
                        break;
                    }
                    case "FetchFollower": {
                        ccMetrics.add(new BrokerMetric(RawMetricType.BROKER_FOLLOWER_FETCH_REQUEST_RATE, timestamp, brokerId, point.asDouble()));
                    }
                }
                break;
            }
            case "io.confluent.kafka.server/request_channel/request_queue_size": {
                ccMetrics.add(new BrokerMetric(RawMetricType.BROKER_REQUEST_QUEUE_SIZE, timestamp, brokerId, point.asInt()));
                break;
            }
            case "io.confluent.kafka.server/request_channel/response_queue_size": {
                ccMetrics.add(new BrokerMetric(RawMetricType.BROKER_RESPONSE_QUEUE_SIZE, timestamp, brokerId, point.asInt()));
                break;
            }
            case "io.confluent.kafka.server/log/size": {
                int partition = Integer.parseInt(point.labels().get(PARTITION_KEY));
                ccMetrics.add(new PartitionMetric(RawMetricType.PARTITION_SIZE, timestamp, brokerId, topic, partition, point.asInt()));
                break;
            }
            case "io.confluent.kafka.server/log_flush/log_flush_rate_and_time_ms/rate/1_min": {
                ccMetrics.add(new BrokerMetric(RawMetricType.BROKER_LOG_FLUSH_RATE, timestamp, brokerId, point.asDouble()));
                break;
            }
            case "io.confluent.kafka.server/server/linux_system_cpu_utilization": {
                ccMetrics.add(new BrokerMetric(RawMetricType.BROKER_CPU_UTIL, timestamp, brokerId, point.asDouble()));
                break;
            }
            case "io.confluent.system/jvm/os/process_cpu_load": {
                ccMetrics.add(new BrokerMetric(RawMetricType.BROKER_CPU_UTIL, timestamp, brokerId, point.asDouble() * 100.0));
                break;
            }
            case "io.confluent.system/volume/disk_total_bytes": {
                ccMetrics.add(new BrokerMetric(RawMetricType.BROKER_DISK_CAPACITY, timestamp, brokerId, point.asInt()));
            }
        }
    }

    static void createCruiseControlSummaryMetrics(String name, int brokerId, TelemetryConverter.Summary summary, List<CruiseControlMetric> ccMetrics) {
        switch (name) {
            case "io.confluent.kafka.server/request/request_queue_time_ms": 
            case "io.confluent.kafka.server/request/local_time_ms": 
            case "io.confluent.kafka.server/request/total_time_ms": {
                if (!REQUEST_TIMER_METRIC_MAP.get(name).containsKey(summary.labels().get(REQUEST_TYPE_KEY))) break;
                ccMetrics.addAll(ConfluentTelemetryReporterSampler.buildTimerMetrics(REQUEST_TIMER_METRIC_MAP.get(name).get(summary.labels().get(REQUEST_TYPE_KEY)), summary, summary.timestamp(), brokerId));
                break;
            }
            case "io.confluent.kafka.server/log_flush/log_flush_rate_and_time_ms": {
                ccMetrics.addAll(ConfluentTelemetryReporterSampler.buildTimerMetrics(LOG_FLUSH_TIMER_METRIC_TYPES, summary, summary.timestamp(), brokerId));
            }
        }
    }

    private static List<CruiseControlMetric> buildTimerMetrics(TimerMetricTypes metricTypes, TelemetryConverter.Summary summary, long timestamp, int brokerId) {
        ArrayList<CruiseControlMetric> ccMetrics = new ArrayList<CruiseControlMetric>();
        summary.quantiles().forEach(quantileValue -> {
            switch ((int)(quantileValue.quantile() * 1000.0)) {
                case 500: {
                    ccMetrics.add(new BrokerMetric(metricTypes.percentile500Type, timestamp, brokerId, quantileValue.value()));
                    break;
                }
                case 999: {
                    ccMetrics.add(new BrokerMetric(metricTypes.percentile999Type, timestamp, brokerId, quantileValue.value()));
                    break;
                }
                case 1000: {
                    ccMetrics.add(new BrokerMetric(metricTypes.maxType, timestamp, brokerId, quantileValue.value()));
                }
            }
        });
        ccMetrics.add(new BrokerMetric(metricTypes.meanType, timestamp, brokerId, summary.sum() / (double)summary.count()));
        return ccMetrics;
    }

    private static CruiseControlMetric buildTopicOrAllTopicMetric(String name, String topic, long timestamp, int brokerId, double value) {
        return ConfluentTelemetryReporterSampler.buildTopicOrAllTopicMetric(TOPIC_METRIC_MAP.get(name), topic, timestamp, brokerId, value);
    }

    private static CruiseControlMetric buildTopicOrAllTopicMetricForFetchFromFollower(String name, String topic, long timestamp, int brokerId, double value) {
        return ConfluentTelemetryReporterSampler.buildTopicOrAllTopicMetric(TOPIC_METRIC_MAP_FOR_FFF.get(name), topic, timestamp, brokerId, value);
    }

    private static CruiseControlMetric buildTopicOrAllTopicMetric(TopicAndAllTopicMetricTypes type, String topic, long timestamp, int brokerId, double value) {
        if (topic != null) {
            return new TopicMetric(type.topicMetricType, timestamp, brokerId, topic, value);
        }
        return new BrokerMetric(type.allTopicMetricType, timestamp, brokerId, value);
    }

    private static Map<String, TopicAndAllTopicMetricTypes> buildTopicMetricMap() {
        HashMap<String, TopicAndAllTopicMetricTypes> map = new HashMap<String, TopicAndAllTopicMetricTypes>();
        map.put(BYTES_IN_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_BYTES_IN, RawMetricType.ALL_TOPIC_BYTES_IN));
        map.put(BYTES_OUT_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_BYTES_OUT, RawMetricType.ALL_TOPIC_BYTES_OUT));
        map.put(REPLICATION_BYTES_IN_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_REPLICATION_BYTES_IN, RawMetricType.ALL_TOPIC_REPLICATION_BYTES_IN));
        map.put(REPLICATION_BYTES_OUT_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_REPLICATION_BYTES_OUT, RawMetricType.ALL_TOPIC_REPLICATION_BYTES_OUT));
        map.put(TOTAL_FETCH_REQUEST_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_FETCH_REQUEST_RATE, RawMetricType.ALL_TOPIC_FETCH_REQUEST_RATE));
        map.put(TOTAL_FOLLOWER_FETCH_REQUEST_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_FOLLOWER_FETCH_REQUEST_RATE, RawMetricType.ALL_TOPIC_FOLLOWER_FETCH_REQUEST_RATE));
        map.put(TOTAL_PRODUCE_REQUEST_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_PRODUCE_REQUEST_RATE, RawMetricType.ALL_TOPIC_PRODUCE_REQUEST_RATE));
        map.put(MESSAGES_IN_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_MESSAGES_IN_PER_SEC, RawMetricType.ALL_TOPIC_MESSAGES_IN_PER_SEC));
        return map;
    }

    private static Map<String, TopicAndAllTopicMetricTypes> buildTopicMetricMapForFetchFromFollower() {
        HashMap<String, TopicAndAllTopicMetricTypes> map = new HashMap<String, TopicAndAllTopicMetricTypes>();
        map.put(BYTES_OUT_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_FETCH_FROM_FOLLOWER_BYTES_OUT, RawMetricType.ALL_TOPIC_FETCH_FROM_FOLLOWER_BYTES_OUT));
        map.put(TOTAL_FETCH_REQUEST_PER_SEC, new TopicAndAllTopicMetricTypes(RawMetricType.TOPIC_FETCH_FROM_FOLLOWER_REQUEST_RATE, RawMetricType.ALL_TOPIC_FETCH_FROM_FOLLOWER_REQUEST_RATE));
        return map;
    }

    private static Map<String, Map<String, TimerMetricTypes>> buildRequestTimerMetricMap() {
        HashMap<String, Map<String, TimerMetricTypes>> map = new HashMap<String, Map<String, TimerMetricTypes>>();
        HashMap<String, TimerMetricTypes> requestQueueTimeMap = new HashMap<String, TimerMetricTypes>();
        requestQueueTimeMap.put(PRODUCE_REQUEST_TYPE, new TimerMetricTypes(RawMetricType.BROKER_PRODUCE_REQUEST_QUEUE_TIME_MS_50TH, RawMetricType.BROKER_PRODUCE_REQUEST_QUEUE_TIME_MS_999TH, RawMetricType.BROKER_PRODUCE_REQUEST_QUEUE_TIME_MS_MAX, RawMetricType.BROKER_PRODUCE_REQUEST_QUEUE_TIME_MS_MEAN));
        requestQueueTimeMap.put(CONSUMER_FETCH_REQUEST_TYPE, new TimerMetricTypes(RawMetricType.BROKER_CONSUMER_FETCH_REQUEST_QUEUE_TIME_MS_50TH, RawMetricType.BROKER_CONSUMER_FETCH_REQUEST_QUEUE_TIME_MS_999TH, RawMetricType.BROKER_CONSUMER_FETCH_REQUEST_QUEUE_TIME_MS_MAX, RawMetricType.BROKER_CONSUMER_FETCH_REQUEST_QUEUE_TIME_MS_MEAN));
        requestQueueTimeMap.put(FOLLOWER_FETCH_REQUEST_TYPE, new TimerMetricTypes(RawMetricType.BROKER_FOLLOWER_FETCH_REQUEST_QUEUE_TIME_MS_50TH, RawMetricType.BROKER_FOLLOWER_FETCH_REQUEST_QUEUE_TIME_MS_999TH, RawMetricType.BROKER_FOLLOWER_FETCH_REQUEST_QUEUE_TIME_MS_MAX, RawMetricType.BROKER_FOLLOWER_FETCH_REQUEST_QUEUE_TIME_MS_MEAN));
        map.put(REQUEST_QUEUE_TIME_MS, requestQueueTimeMap);
        HashMap<String, TimerMetricTypes> localTimeMap = new HashMap<String, TimerMetricTypes>();
        localTimeMap.put(PRODUCE_REQUEST_TYPE, new TimerMetricTypes(RawMetricType.BROKER_PRODUCE_LOCAL_TIME_MS_50TH, RawMetricType.BROKER_PRODUCE_LOCAL_TIME_MS_999TH, RawMetricType.BROKER_PRODUCE_LOCAL_TIME_MS_MAX, RawMetricType.BROKER_PRODUCE_LOCAL_TIME_MS_MEAN));
        localTimeMap.put(CONSUMER_FETCH_REQUEST_TYPE, new TimerMetricTypes(RawMetricType.BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH, RawMetricType.BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH, RawMetricType.BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MAX, RawMetricType.BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MEAN));
        localTimeMap.put(FOLLOWER_FETCH_REQUEST_TYPE, new TimerMetricTypes(RawMetricType.BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH, RawMetricType.BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH, RawMetricType.BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MAX, RawMetricType.BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MEAN));
        map.put(LOCAL_TIME_MS, localTimeMap);
        HashMap<String, TimerMetricTypes> totalTimeMap = new HashMap<String, TimerMetricTypes>();
        totalTimeMap.put(PRODUCE_REQUEST_TYPE, new TimerMetricTypes(RawMetricType.BROKER_PRODUCE_TOTAL_TIME_MS_50TH, RawMetricType.BROKER_PRODUCE_TOTAL_TIME_MS_999TH, RawMetricType.BROKER_PRODUCE_TOTAL_TIME_MS_MAX, RawMetricType.BROKER_PRODUCE_TOTAL_TIME_MS_MEAN));
        totalTimeMap.put(CONSUMER_FETCH_REQUEST_TYPE, new TimerMetricTypes(RawMetricType.BROKER_CONSUMER_FETCH_TOTAL_TIME_MS_50TH, RawMetricType.BROKER_CONSUMER_FETCH_TOTAL_TIME_MS_999TH, RawMetricType.BROKER_CONSUMER_FETCH_TOTAL_TIME_MS_MAX, RawMetricType.BROKER_CONSUMER_FETCH_TOTAL_TIME_MS_MEAN));
        totalTimeMap.put(FOLLOWER_FETCH_REQUEST_TYPE, new TimerMetricTypes(RawMetricType.BROKER_FOLLOWER_FETCH_TOTAL_TIME_MS_50TH, RawMetricType.BROKER_FOLLOWER_FETCH_TOTAL_TIME_MS_999TH, RawMetricType.BROKER_FOLLOWER_FETCH_TOTAL_TIME_MS_MAX, RawMetricType.BROKER_FOLLOWER_FETCH_TOTAL_TIME_MS_MEAN));
        map.put(TOTAL_TIME_MS, totalTimeMap);
        return map;
    }

    private static class TimerMetricTypes {
        RawMetricType percentile500Type;
        RawMetricType percentile999Type;
        RawMetricType maxType;
        RawMetricType meanType;

        TimerMetricTypes(RawMetricType percentile500Type, RawMetricType percentile999Type, RawMetricType maxType, RawMetricType meanType) {
            this.percentile500Type = percentile500Type;
            this.percentile999Type = percentile999Type;
            this.maxType = maxType;
            this.meanType = meanType;
        }
    }

    private static class TopicAndAllTopicMetricTypes {
        RawMetricType topicMetricType;
        RawMetricType allTopicMetricType;

        TopicAndAllTopicMetricTypes(RawMetricType topicMetricType, RawMetricType allTopicMetricType) {
            this.topicMetricType = topicMetricType;
            this.allTopicMetricType = allTopicMetricType;
        }
    }
}

