/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.inject.AbstractModule;
import com.google.inject.BindingAnnotation;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.confluent.command.cluster_metadata.CommandClusterMetadata;
import io.confluent.command.record.Command;
import io.confluent.common.metrics.JmxReporter;
import io.confluent.common.metrics.MetricConfig;
import io.confluent.common.metrics.Metrics;
import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.alert.record.Alert;
import io.confluent.controlcenter.command.CommandModule;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.serialization.SerializationModule;
import io.confluent.controlcenter.streams.C3Streams;
import io.confluent.controlcenter.streams.KafkaStreamsManager;
import io.confluent.controlcenter.streams.StreamBuilderProvider;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.WindowExtractor;
import io.confluent.controlcenter.streams.aggregation.BufferMetricEvent;
import io.confluent.controlcenter.streams.aggregation.GroupingSets;
import io.confluent.controlcenter.streams.aggregation.MetricHolder;
import io.confluent.controlcenter.streams.aggregation.MetricValues;
import io.confluent.controlcenter.streams.aggregation.MetricsAggregation;
import io.confluent.controlcenter.streams.alert.AlertHistoryProcessorSupplier;
import io.confluent.controlcenter.streams.alert.AlertTransformerSupplier;
import io.confluent.controlcenter.streams.alert.MetricToTriggerMeasurementTransformerSupplier;
import io.confluent.controlcenter.streams.alert.MonitoringTriggerTransformerSupplier;
import io.confluent.controlcenter.streams.verify.MonitoringHeartbeatSender;
import io.confluent.controlcenter.streams.verify.MonitoringVerifier;
import io.confluent.controlcenter.streams.verify.VerifyTransformerSupplier;
import io.confluent.controlcenter.util.LruSet;
import io.confluent.controlcenter.util.StreamProgressReporter;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.common.MonitoringMessageUtil;
import io.confluent.monitoring.common.MonitoringProducerDefaults;
import io.confluent.monitoring.common.SystemClock;
import io.confluent.monitoring.common.TimeBucket;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.OrderedKeyUberSerde;
import io.confluent.serializers.UberSerde;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamsModule
extends AbstractModule {
    private static final Logger log = LoggerFactory.getLogger(StreamsModule.class);

    protected void configure() {
        this.bind(MetricsAggregation.class).asEagerSingleton();
        this.bind(C3Streams.Builder.class).toProvider(StreamBuilderProvider.class).asEagerSingleton();
        this.requestStaticInjection(new Class[]{WindowExtractor.class});
    }

    @Provides
    protected LruSet<CommandClusterMetadata.KafkaClusterMetadata> provideKcmLru() {
        return new LruSet<CommandClusterMetadata.KafkaClusterMetadata>(128);
    }

    @Provides
    @Singleton
    @BrokerCache
    protected LoadingCache<String, SortedSet<Integer>> brokerCache(ControlCenterConfig config) {
        long retention = config.getLong("confluent.metrics.topic.retention.ms");
        return CacheBuilder.newBuilder().expireAfterWrite(retention, TimeUnit.MILLISECONDS).maximumSize(10000L).build((CacheLoader)new CacheLoader<String, SortedSet<Integer>>(){

            public SortedSet<Integer> load(String key) throws Exception {
                return new ConcurrentSkipListSet<Integer>();
            }
        });
    }

    @Provides
    @Singleton
    @TopicCache
    protected LoadingCache<String, SortedSet<String>> topicCache(ControlCenterConfig config) {
        long retention = config.getLong("confluent.metrics.topic.retention.ms");
        return CacheBuilder.newBuilder().expireAfterWrite(retention, TimeUnit.MILLISECONDS).maximumSize(10000L).build((CacheLoader)new CacheLoader<String, SortedSet<String>>(){

            public SortedSet<String> load(String key) throws Exception {
                return new ConcurrentSkipListSet<String>();
            }
        });
    }

    @Provides
    @Inject
    protected KafkaStreams streamProvider(StreamsConfig streamsConfig, C3Streams.Builder builder) {
        return new KafkaStreams(builder.build(), streamsConfig);
    }

    @Provides
    @TopicStoreModule.GroupStore
    @Inject
    protected Map<Rollup, ReadOnlyWindowStore<Bytes, Controlcenter.WindowedGrouping>> getGroupStore(@TopicStoreModule.GroupStore TopicStoreMaster.Store<Bytes, Controlcenter.WindowedGrouping, Controlcenter.WindowedGrouping> groupStore, KafkaStreamsManager kafkaStreamsManager) {
        return StreamsModule.getRollupStores(groupStore, kafkaStreamsManager);
    }

    @Provides
    @TopicStoreModule.MonitoringStreamStore
    @Inject
    protected Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>> getMonitoringStreamStore(@TopicStoreModule.MonitoringStreamStore TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> monitoringStreamStore, KafkaStreamsManager kafkaStreamsManager) {
        return StreamsModule.getRollupStores(monitoringStreamStore, kafkaStreamsManager);
    }

    @Provides
    @TopicStoreModule.MonitoringMessageAggregatorWindowsStore
    @Inject
    protected Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>> getMonitoringMessageAggregatorWindowsStore(@TopicStoreModule.MonitoringMessageAggregatorWindowsStore TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> mmAggregatorWindowsStore, KafkaStreamsManager kafkaStreamsManager) {
        return StreamsModule.getRollupStores(mmAggregatorWindowsStore, kafkaStreamsManager);
    }

    static <K, V> Map<Rollup, ReadOnlyWindowStore<K, V>> getRollupStores(TopicStoreMaster.Store<K, ?, V> groupStore, KafkaStreamsManager kafkaStreamsManager) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Rollup rollup : Rollup.values()) {
            builder.put((Object)rollup, kafkaStreamsManager.getKStreams().store(TopicStoreMaster.nameJoin(groupStore.name, rollup.name()), QueryableStoreTypes.windowStore()));
        }
        return builder.build();
    }

    @Provides
    @TopicStoreModule.AlertHistoryStore
    @Inject
    protected ReadOnlyKeyValueStore<Bytes, Alert.AlertInfo> getAlertHistoryStore(@TopicStoreModule.AlertHistoryStore TopicStoreMaster.Store<Bytes, Alert.AlertInfo, Alert.AlertInfo> alertHistoryStore, KafkaStreamsManager kstream) {
        return (ReadOnlyKeyValueStore)kstream.getKStreams().store(alertHistoryStore.name, QueryableStoreTypes.keyValueStore());
    }

    @Provides
    @Inject
    @Singleton
    protected Set<StoreBuilder> provideStoreSuppliers(@TopicStoreModule.VerifierStore TopicStoreMaster.Store<Bytes, MonitoringVerifier, MonitoringVerifier> verifierStore, @TopicStoreModule.MonitoringTriggerStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerMeasurement, Controlcenter.TriggerMeasurement> monitoringTriggerStore, @TopicStoreModule.TriggerActionsStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerActions, Controlcenter.TriggerActions> triggerActionsStore, @TopicStoreModule.TriggerEventsStore TopicStoreMaster.Store<Bytes, Controlcenter.VerifiableTriggerInfo, Controlcenter.VerifiableTriggerInfo> triggerEventsStore, @TopicStoreModule.AlertHistoryStore TopicStoreMaster.Store<Bytes, Alert.AlertInfo, Alert.AlertInfo> alertHistoryStore, Set<TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long>> verifiableStores) {
        ImmutableSet.Builder builders = ImmutableSet.builder();
        ImmutableList transformerStores = ImmutableList.of(verifierStore, monitoringTriggerStore, triggerActionsStore, triggerEventsStore, alertHistoryStore);
        for (TopicStoreMaster.Store store : transformerStores) {
            log.info("transformerStore={}", (Object)store.name);
            builders.add((Object)Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)store.name), store.keySerde, store.valueSerde));
        }
        for (TopicStoreMaster.Store store : verifiableStores) {
            HashSet<String> storeNames = new HashSet<String>();
            if (store.rollup) {
                for (Rollup rollup : Rollup.values()) {
                    storeNames.add(TopicStoreMaster.nameJoin(store.name, rollup.name()));
                }
            } else {
                storeNames.add(store.name);
            }
            for (String storeName : storeNames) {
                builders.add((Object)Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.lruMap((String)storeName, (int)16384), store.keySerde, store.aggregateSerde));
            }
        }
        return builders.build();
    }

    @Provides
    @TopicStoreModule.MetricsAggregateStore
    @Inject
    protected GroupingSets.PartitionedGroupingSets.GroupedWindowStore<MetricValues> getMetricsAggregateStore(KafkaStreamsManager streams, MetricsAggregation metricsAggregation, @TopicStoreModule.MetricsAggregateStore TopicStoreMaster.Store<ByteBuffer, MetricHolder, MetricHolder> metricsAggregateStore) {
        return metricsAggregation.metricsGroupStore((ReadOnlyWindowStore<BufferMetricEvent, MetricValues>)((ReadOnlyWindowStore)streams.getKStreams().store(metricsAggregateStore.name, QueryableStoreTypes.windowStore())));
    }

    @Provides
    @Inject
    @Singleton
    protected VerifyTransformerSupplier provideVerifyTransformerSupplier(ControlCenterConfig config, @TopicStoreModule.VerifierStore TopicStoreMaster.Store<Bytes, MonitoringVerifier, MonitoringVerifier> verifierStore, @SerializationModule.VerifierKeySerde OrderedKeyUberSerde<Monitoring.MonitoringMessage> keySerde, Clock clock, MonitoringHeartbeatSender heartbeatSender) {
        return new VerifyTransformerSupplier(verifierStore.name, config.getString("confluent.monitoring.interceptor.topic"), heartbeatSender, clock, keySerde);
    }

    @Provides
    @Inject
    @Singleton
    protected MetricToTriggerMeasurementTransformerSupplier provideMetricToTriggerMeasurementTransformerSupplier(Clock clock) {
        return new MetricToTriggerMeasurementTransformerSupplier(clock);
    }

    @Provides
    @Inject
    @Singleton
    protected MonitoringTriggerTransformerSupplier provideMonitoringTriggerTransformerSupplier(ControlCenterConfig config, @TopicStoreModule.MonitoringTriggerStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerMeasurement, Controlcenter.TriggerMeasurement> monitoringTriggerStore, @SerializationModule.MonitoringTriggerKeySerde OrderedKeyUberSerde<Controlcenter.WindowedClusterGroup> keySerde, @CommandModule.TriggerConfigs Provider<ReadOnlyKeyValueStore<Command.CommandKey, Command.CommandMessage>> triggerConfigsProvider, Clock clock) {
        return new MonitoringTriggerTransformerSupplier(monitoringTriggerStore.name, clock, keySerde, triggerConfigsProvider, config.getInt("confluent.controlcenter.internal.topics.partitions"));
    }

    @Provides
    @Inject
    @Singleton
    protected AlertTransformerSupplier provideAlertTransformerSupplier(ControlCenterConfig config, @TopicStoreModule.TriggerActionsStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerActions, Controlcenter.TriggerActions> triggerActionsStore, @TopicStoreModule.TriggerEventsStore TopicStoreMaster.Store<Bytes, Controlcenter.VerifiableTriggerInfo, Controlcenter.VerifiableTriggerInfo> triggerEventsStore, @SerializationModule.TriggerActionsKeySerde OrderedKeyUberSerde<Controlcenter.TriggerEventKey> actionsKeySerde, @CommandModule.TriggerConfigs Provider<ReadOnlyKeyValueStore<Command.CommandKey, Command.CommandMessage>> triggerConfigsProvider, @CommandModule.ActionConfigs Provider<ReadOnlyKeyValueStore<Command.CommandKey, Command.CommandMessage>> actionConfigsProvider, Clock clock) {
        return new AlertTransformerSupplier(triggerActionsStore.name, triggerEventsStore.name, config.getInt("confluent.controlcenter.alert.max.trigger.events"), clock, actionsKeySerde, triggerConfigsProvider, actionConfigsProvider);
    }

    @Provides
    @Inject
    @Singleton
    protected AlertHistoryProcessorSupplier provideAlertHistoryProcessorSupplier(@TopicStoreModule.AlertHistoryStore TopicStoreMaster.Store<Bytes, Alert.AlertInfo, Alert.AlertInfo> alertHistoryStore, @SerializationModule.TriggerEventsKeySerde OrderedKeyUberSerde<Alert.AlertInfo> alertKeySerde, @SerializationModule.TriggerEventsKeyPrefixSerde OrderedKeyUberSerde<Alert.AlertInfo> alertRangeSerde) {
        return new AlertHistoryProcessorSupplier(alertHistoryStore.name, alertKeySerde, alertRangeSerde);
    }

    @Provides
    @Inject
    @Singleton
    protected Clock provideClock() {
        return new SystemClock();
    }

    @Provides
    @Singleton
    @Inject
    protected MonitoringHeartbeatSender provideMonitoringHeartbeatSender(ControlCenterConfig cfg, StreamsConfig streamsConfig, UberSerde<Monitoring.MonitoringMessage> monitoringMessageUberSerde, @TopicStoreModule.ExpectedGroupConsumptionAggregateTopic TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> rekeyedExpectedGroupConsumptionTopic, @TopicStoreModule.ActualGroupConsumptionAggregateTopic TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> rekeyedActualGroupConsumptionTopic, Clock clock) {
        Monitoring.MonitoringMessage baseMonitoringMessage = StreamsModule.provideBaseHeartbeatMonitoringMessage(cfg.getString("confluent.controlcenter.id"));
        Map producerProperties = streamsConfig.getProducerConfigs("confluent-control-center-heartbeat-sender-" + baseMonitoringMessage.getClientId());
        producerProperties.putAll(MonitoringProducerDefaults.PRODUCER_CONFIG_DEFAULTS);
        producerProperties.put("bootstrap.servers", cfg.getList("bootstrap.servers"));
        KafkaProducer kp = new KafkaProducer(producerProperties);
        HashSet<TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void>> internalPublishTopics = new HashSet<TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void>>();
        internalPublishTopics.add(rekeyedExpectedGroupConsumptionTopic);
        internalPublishTopics.add(rekeyedActualGroupConsumptionTopic);
        return new MonitoringHeartbeatSender(cfg.getString("confluent.monitoring.interceptor.topic"), internalPublishTopics, baseMonitoringMessage, (KafkaProducer<byte[], byte[]>)kp, monitoringMessageUberSerde, TimeBucket.SIZE, clock);
    }

    @Inject
    @Provides
    @C3StreamsMetrics
    protected Metrics provideC3StreamsMetrics(StreamProgressReporter streamProgressReporter) {
        MetricConfig metricConfig = new MetricConfig().samples(30).timeWindow(1L, TimeUnit.SECONDS);
        ArrayList reporters = Lists.newArrayList();
        reporters.add(new JmxReporter("c3-streams"));
        reporters.add(streamProgressReporter);
        return new Metrics(metricConfig, (List)reporters, (Time)new SystemTime());
    }

    public static Monitoring.MonitoringMessage provideBaseHeartbeatMonitoringMessage(String clientId) {
        return Monitoring.MonitoringMessage.newBuilder((Monitoring.MonitoringMessage)MonitoringMessageUtil.baseMonitoringMessage()).setClientType(Monitoring.ClientType.CONTROLCENTER).setClientId(clientId).setType(Monitoring.MessageType.HEARTBEAT).setWindow(0L).setTopic("").buildPartial();
    }

    @BindingAnnotation
    @Retention(value=RetentionPolicy.RUNTIME)
    @Target(value={ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    public static @interface C3StreamsMetrics {
    }

    @BindingAnnotation
    @Retention(value=RetentionPolicy.RUNTIME)
    @Target(value={ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    public static @interface TopicCache {
    }

    @BindingAnnotation
    @Retention(value=RetentionPolicy.RUNTIME)
    @Target(value={ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    public static @interface BrokerCache {
    }
}

