/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.confluent.common.metrics.MeasurableStat;
import io.confluent.common.metrics.MetricName;
import io.confluent.common.metrics.Metrics;
import io.confluent.common.metrics.Sensor;
import io.confluent.common.metrics.stats.Count;
import io.confluent.common.metrics.stats.Max;
import io.confluent.common.metrics.stats.Min;
import io.confluent.common.metrics.stats.Rate;
import io.confluent.common.metrics.stats.SampledStat;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.streams.StreamsModule;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.aggregation.MetricEvent;
import io.confluent.controlcenter.util.ClusterTopicPartition;
import io.confluent.controlcenter.util.MinMeasurableStat;
import io.confluent.metrics.record.ConfluentMetric;
import io.confluent.monitoring.record.Monitoring;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowExtractor
implements TimestampExtractor {
    private static final Logger log = LoggerFactory.getLogger(WindowExtractor.class);
    private static String monitoringTopicName = null;
    private static String metricsTopicName = null;
    private static LoadingCache<ClusterTopicPartition, Sensor> metrics = null;

    @Inject
    public static void setTopicNames(final @StreamsModule.C3StreamsMetrics Metrics newMetrics, TopicStoreMaster topicStoreMaster) {
        monitoringTopicName = topicStoreMaster.getMonitoringTopicName();
        metricsTopicName = topicStoreMaster.getMetricsTopicName();
        log.info("setting topic names {}", (Object)monitoringTopicName);
        metrics = CacheBuilder.newBuilder().build((CacheLoader)new CacheLoader<ClusterTopicPartition, Sensor>(){
            private final MinMeasurableStat monitoringInputTopicGauges = new MinMeasurableStat();
            private final MinMeasurableStat metricsInputTopicGauges = new MinMeasurableStat();

            public synchronized Sensor getOrCreateSensor(String sensorName, MinMeasurableStat minSet, Sensor ... parents) {
                Sensor sensor = newMetrics.getSensor(sensorName);
                if (sensor == null) {
                    sensor = newMetrics.sensor(sensorName, parents);
                    sensor.add(new MetricName(sensor.name() + ".count", "WindowExtractor"), (MeasurableStat)new Count());
                    sensor.add(new MetricName(sensor.name() + ".rate", "WindowExtractor"), (MeasurableStat)new Rate((SampledStat)new Count()));
                    Max max = new Max();
                    sensor.add(new MetricName(sensor.name() + ".timestamp.max", "WindowExtractor"), (MeasurableStat)max);
                    sensor.add(new MetricName(sensor.name() + ".timestamp.min", "WindowExtractor"), (MeasurableStat)new Min());
                    if (minSet != null) {
                        minSet.addGauge((MeasurableStat)max);
                    }
                }
                return sensor;
            }

            public synchronized Sensor getOrCreateSensor(String sensorName, Map<String, String> tags, MeasurableStat minSet) {
                Sensor sensor = newMetrics.getSensor(sensorName);
                if (sensor == null) {
                    sensor = newMetrics.sensor(sensorName);
                    sensor.add(new MetricName(sensor.name() + ".count", "WindowExtractor", tags), (MeasurableStat)new Count());
                    sensor.add(new MetricName(sensor.name() + ".rate", "WindowExtractor", tags), (MeasurableStat)new Rate((SampledStat)new Count()));
                    sensor.add(new MetricName(sensor.name() + ".timestamp", "WindowExtractor", tags), (MeasurableStat)new Min());
                    sensor.add(new MetricName(sensor.name() + ".min", "WindowExtractor", tags), minSet);
                }
                return sensor;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Sensor load(ClusterTopicPartition key) throws Exception {
                Metrics metrics = newMetrics;
                synchronized (metrics) {
                    Sensor[] sensorArray;
                    String topic = key.topicPartition.topic();
                    Sensor clusterSensor = null;
                    MinMeasurableStat mms = null;
                    log.debug("making sensor for key={}", (Object)key);
                    if (key.cluster != null) {
                        if (monitoringTopicName != null && monitoringTopicName.equals(topic)) {
                            mms = this.monitoringInputTopicGauges;
                            clusterSensor = this.getOrCreateSensor("monitoring-input-topic-progress-" + key.cluster, (Map<String, String>)ImmutableMap.of((Object)"input", (Object)"monitoring", (Object)"progress", (Object)"input-topic", (Object)"cluster", (Object)key.cluster), mms);
                        } else if (metricsTopicName != null && metricsTopicName.equals(topic)) {
                            mms = this.metricsInputTopicGauges;
                            clusterSensor = this.getOrCreateSensor("metrics-input-topic-progress-" + key.cluster, (Map<String, String>)ImmutableMap.of((Object)"input", (Object)"metrics", (Object)"progress", (Object)"input-topic", (Object)"cluster", (Object)key.cluster), mms);
                        }
                    }
                    if (clusterSensor == null) {
                        sensorArray = null;
                    } else {
                        Sensor[] sensorArray2 = new Sensor[1];
                        sensorArray = sensorArray2;
                        sensorArray2[0] = clusterSensor;
                    }
                    Sensor topicSensor = this.getOrCreateSensor(topic, mms, sensorArray);
                    Sensor topicPartitionSensor = this.getOrCreateSensor(key.topicPartition.toString(), mms, topicSensor);
                    return topicPartitionSensor;
                }
            }
        });
    }

    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
        long timestamp = 0L;
        String clusterId = null;
        try {
            if (record.value() instanceof Monitoring.MonitoringMessage) {
                timestamp = ((Monitoring.MonitoringMessage)record.value()).getWindow();
                clusterId = ((Monitoring.MonitoringMessage)record.value()).getClusterId();
            } else if (record.value() instanceof Controlcenter.ClientGroup) {
                timestamp = ((Controlcenter.ClientGroup)record.value()).getWindow();
            } else if (record.value() instanceof Controlcenter.WindowedGrouping) {
                timestamp = ((Controlcenter.WindowedGrouping)record.value()).getWindow();
            } else if (record.value() instanceof Controlcenter.VerifiableMonitoringMessage) {
                Controlcenter.VerifiableMonitoringMessage value = (Controlcenter.VerifiableMonitoringMessage)record.value();
                timestamp = value.getMonitoringMessage().getWindow();
                clusterId = value.getMonitoringMessage().getClusterId();
            } else if (record.value() instanceof Controlcenter.TriggerEvent) {
                timestamp = ((Controlcenter.TriggerEvent)record.value()).getWindow();
            } else if (record.value() instanceof ConfluentMetric.MetricsMessage) {
                timestamp = ((ConfluentMetric.MetricsMessage)record.value()).getTimestamp();
                clusterId = ((ConfluentMetric.MetricsMessage)record.value()).getClusterId();
            } else if (record.key() instanceof MetricEvent) {
                timestamp = record.timestamp();
            } else if (record.key() instanceof Controlcenter.WindowedClusterGroup) {
                timestamp = ((Controlcenter.WindowedClusterGroup)record.key()).getWindow();
            } else {
                log.warn("unable to extract message timestamp: unknown message type");
            }
        }
        catch (Exception e) {
            log.warn("unable to extract message timestamp: error extracting timestamp", (Throwable)e);
        }
        ClusterTopicPartition ctp = new ClusterTopicPartition(clusterId, new TopicPartition(record.topic(), record.partition()));
        if (metrics != null && timestamp > 0L) {
            Sensor tpSensor = (Sensor)metrics.getUnchecked((Object)ctp);
            tpSensor.record((double)timestamp);
        }
        if (timestamp <= 0L) {
            if (previousTimestamp >= 0L) {
                log.debug("Extracted timestamp {} <=0, will return previousTimestamp {}", (Object)timestamp, (Object)previousTimestamp);
                return previousTimestamp;
            }
            log.debug("Extracted timestamp {} <=0 and previousTimestamp < 0, will return 0", (Object)timestamp);
            return 0L;
        }
        return timestamp;
    }
}

