package io.confluent.controlcenter.streams;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import com.google.inject.AbstractModule;
import com.google.inject.BindingAnnotation;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.confluent.command.cluster_metadata.CommandClusterMetadata;
import io.confluent.command.record.Command;
import io.confluent.common.metrics.JmxReporter;
import io.confluent.common.metrics.MetricConfig;
import io.confluent.common.metrics.Metrics;
import io.confluent.common.utils.SystemTime;
import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.alert.record.Alert;
import io.confluent.controlcenter.command.CommandModule;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.rest.Credential;
import io.confluent.controlcenter.serialization.SerializationModule;
import io.confluent.controlcenter.streams.C3Streams;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.aggregation.GroupingSets;
import io.confluent.controlcenter.streams.aggregation.MetricHolder;
import io.confluent.controlcenter.streams.aggregation.MetricValues;
import io.confluent.controlcenter.streams.aggregation.MetricsAggregation;
import io.confluent.controlcenter.streams.alert.AlertHistoryProcessorSupplier;
import io.confluent.controlcenter.streams.alert.AlertTransformerSupplier;
import io.confluent.controlcenter.streams.alert.MetricToTriggerMeasurementTransformerSupplier;
import io.confluent.controlcenter.streams.alert.MonitoringTriggerTransformerSupplier;
import io.confluent.controlcenter.streams.verify.MonitoringHeartbeatSender;
import io.confluent.controlcenter.streams.verify.MonitoringVerifier;
import io.confluent.controlcenter.streams.verify.VerifyTransformerSupplier;
import io.confluent.controlcenter.util.LruSet;
import io.confluent.controlcenter.util.StreamProgressReporter;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.common.MonitoringMessageUtil;
import io.confluent.monitoring.common.MonitoringProducerDefaults;
import io.confluent.monitoring.common.SystemClock;
import io.confluent.monitoring.common.TimeBucket;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.OrderedKeyUberSerde;
import io.confluent.serializers.UberSerde;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/StreamsModule.class */
public class StreamsModule extends AbstractModule {
    private static final Logger log = LoggerFactory.getLogger(StreamsModule.class);

    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @BindingAnnotation
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:io/confluent/controlcenter/streams/StreamsModule$BrokerCache.class */
    public @interface BrokerCache {
    }

    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @BindingAnnotation
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:io/confluent/controlcenter/streams/StreamsModule$C3StreamsMetrics.class */
    public @interface C3StreamsMetrics {
    }

    @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
    @BindingAnnotation
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:io/confluent/controlcenter/streams/StreamsModule$TopicCache.class */
    public @interface TopicCache {
    }

    protected void configure() {
        bind(MetricsAggregation.class).asEagerSingleton();
        bind(C3Streams.Builder.class).toProvider(StreamBuilderProvider.class).asEagerSingleton();
        requestStaticInjection(new Class[]{WindowExtractor.class});
    }

    @Provides
    protected LruSet<CommandClusterMetadata.KafkaClusterMetadata> provideKcmLru() {
        return new LruSet<>(128);
    }

    @Singleton
    @BrokerCache
    @Provides
    protected LoadingCache<Credential, SortedSet<Integer>> brokerCache(ControlCenterConfig controlCenterConfig) {
        return CacheBuilder.newBuilder().expireAfterWrite(controlCenterConfig.getLong(ControlCenterConfig.CONTROL_CENTER_METRICS_TOPIC_RETENTION_MS_CONFIG).longValue(), TimeUnit.MILLISECONDS).maximumSize(10000L).build(new CacheLoader<Credential, SortedSet<Integer>>() { // from class: io.confluent.controlcenter.streams.StreamsModule.1
            public SortedSet<Integer> load(Credential credential) throws Exception {
                return new ConcurrentSkipListSet();
            }
        });
    }

    @Singleton
    @Provides
    @TopicCache
    protected LoadingCache<Credential, SortedSet<String>> topicCache(ControlCenterConfig controlCenterConfig) {
        return CacheBuilder.newBuilder().expireAfterWrite(controlCenterConfig.getLong(ControlCenterConfig.CONTROL_CENTER_METRICS_TOPIC_RETENTION_MS_CONFIG).longValue(), TimeUnit.MILLISECONDS).maximumSize(10000L).build(new CacheLoader<Credential, SortedSet<String>>() { // from class: io.confluent.controlcenter.streams.StreamsModule.2
            public SortedSet<String> load(Credential credential) throws Exception {
                return new ConcurrentSkipListSet();
            }
        });
    }

    @Inject
    @Provides
    protected KafkaStreams streamProvider(StreamsConfig streamsConfig, C3Streams.Builder builder) {
        Properties properties = new Properties();
        properties.putAll(streamsConfig.originals());
        return new KafkaStreams(builder.build(properties), streamsConfig);
    }

    @Inject
    @Provides
    @TopicStoreModule.GroupStore
    protected Map<Rollup, ReadOnlyWindowStore<Bytes, Controlcenter.WindowedGrouping>> getGroupStore(@TopicStoreModule.GroupStore TopicStoreMaster.Store<Bytes, Controlcenter.WindowedGrouping, Controlcenter.WindowedGrouping> store, KafkaStreamsManager kafkaStreamsManager) {
        return getRollupStores(store, kafkaStreamsManager);
    }

    @TopicStoreModule.MonitoringStreamStore
    @Inject
    @Provides
    protected Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>> getMonitoringStreamStore(@TopicStoreModule.MonitoringStreamStore TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> store, KafkaStreamsManager kafkaStreamsManager) {
        return getRollupStores(store, kafkaStreamsManager);
    }

    @TopicStoreModule.MonitoringMessageAggregatorWindowsStore
    @Inject
    @Provides
    protected Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>> getMonitoringMessageAggregatorWindowsStore(@TopicStoreModule.MonitoringMessageAggregatorWindowsStore TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> store, KafkaStreamsManager kafkaStreamsManager) {
        return getRollupStores(store, kafkaStreamsManager);
    }

    static <K, V> Map<Rollup, ReadOnlyWindowStore<K, V>> getRollupStores(TopicStoreMaster.Store<K, ?, V> store, KafkaStreamsManager kafkaStreamsManager) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Rollup rollup : Rollup.values()) {
            builder.put(rollup, kafkaStreamsManager.getKStreams().store(TopicStoreMaster.nameJoin(store.name, rollup.name()), QueryableStoreTypes.windowStore()));
        }
        return builder.build();
    }

    @Inject
    @Provides
    @TopicStoreModule.AlertHistoryStore
    protected ReadOnlyKeyValueStore<Bytes, Alert.AlertInfo> getAlertHistoryStore(@TopicStoreModule.AlertHistoryStore TopicStoreMaster.Store<Bytes, Alert.AlertInfo, Alert.AlertInfo> store, KafkaStreamsManager kafkaStreamsManager) {
        return (ReadOnlyKeyValueStore) kafkaStreamsManager.getKStreams().store(store.name, QueryableStoreTypes.keyValueStore());
    }

    @Inject
    @Singleton
    @Provides
    protected Set<StoreBuilder> provideStoreSuppliers(@TopicStoreModule.VerifierStore TopicStoreMaster.Store<Bytes, MonitoringVerifier, MonitoringVerifier> store, @TopicStoreModule.MonitoringTriggerStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerMeasurement, Controlcenter.TriggerMeasurement> store2, @TopicStoreModule.TriggerActionsStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerActions, Controlcenter.TriggerActions> store3, @TopicStoreModule.TriggerEventsStore TopicStoreMaster.Store<Bytes, Controlcenter.VerifiableTriggerInfo, Controlcenter.VerifiableTriggerInfo> store4, @TopicStoreModule.AlertHistoryStore TopicStoreMaster.Store<Bytes, Alert.AlertInfo, Alert.AlertInfo> store5, Set<TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long>> set) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        UnmodifiableIterator it = ImmutableList.of(store, store2, store3, store4, store5).iterator();
        while (it.hasNext()) {
            TopicStoreMaster.Store store6 = (TopicStoreMaster.Store) it.next();
            log.info("transformerStore={}", store6.name);
            builder.add(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(store6.name), store6.keySerde, store6.valueSerde));
        }
        for (TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> store7 : set) {
            HashSet hashSet = new HashSet();
            if (store7.rollup) {
                for (Rollup rollup : Rollup.values()) {
                    hashSet.add(TopicStoreMaster.nameJoin(store7.name, rollup.name()));
                }
            } else {
                hashSet.add(store7.name);
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                builder.add(Stores.keyValueStoreBuilder(Stores.lruMap((String) it2.next(), 16384), store7.keySerde, store7.aggregateSerde));
            }
        }
        return builder.build();
    }

    @TopicStoreModule.MetricsAggregateStore
    @Inject
    @Provides
    protected GroupingSets.PartitionedGroupingSets.GroupedWindowStore<MetricValues> getMetricsAggregateStore(KafkaStreamsManager kafkaStreamsManager, MetricsAggregation metricsAggregation, @TopicStoreModule.MetricsAggregateStore TopicStoreMaster.Store<ByteBuffer, MetricHolder, MetricHolder> store) {
        return metricsAggregation.metricsGroupStore((ReadOnlyWindowStore) kafkaStreamsManager.getKStreams().store(store.name, QueryableStoreTypes.windowStore()));
    }

    @Inject
    @Singleton
    @Provides
    protected VerifyTransformerSupplier provideVerifyTransformerSupplier(ControlCenterConfig controlCenterConfig, @TopicStoreModule.VerifierStore TopicStoreMaster.Store<Bytes, MonitoringVerifier, MonitoringVerifier> store, @SerializationModule.VerifierKeySerde OrderedKeyUberSerde<Monitoring.MonitoringMessage> orderedKeyUberSerde, Clock clock, MonitoringHeartbeatSender monitoringHeartbeatSender) {
        return new VerifyTransformerSupplier(store.name, controlCenterConfig.getString(ControlCenterConfig.CONTROL_CENTER_TOPIC_CONFIG), monitoringHeartbeatSender, clock, orderedKeyUberSerde);
    }

    @Inject
    @Singleton
    @Provides
    protected MetricToTriggerMeasurementTransformerSupplier provideMetricToTriggerMeasurementTransformerSupplier(Clock clock) {
        return new MetricToTriggerMeasurementTransformerSupplier(clock);
    }

    @Inject
    @Singleton
    @Provides
    protected MonitoringTriggerTransformerSupplier provideMonitoringTriggerTransformerSupplier(ControlCenterConfig controlCenterConfig, @TopicStoreModule.MonitoringTriggerStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerMeasurement, Controlcenter.TriggerMeasurement> store, @SerializationModule.MonitoringTriggerKeySerde OrderedKeyUberSerde<Controlcenter.WindowedClusterGroup> orderedKeyUberSerde, @CommandModule.TriggerConfigs Provider<ReadOnlyKeyValueStore<Command.CommandKey, Command.CommandMessage>> provider, Clock clock) {
        return new MonitoringTriggerTransformerSupplier(store.name, clock, orderedKeyUberSerde, provider, controlCenterConfig.getInt(ControlCenterConfig.CONTROL_CENTER_INTERNAL_PARTITIONS_CONFIG).intValue());
    }

    @Inject
    @Singleton
    @Provides
    protected AlertTransformerSupplier provideAlertTransformerSupplier(ControlCenterConfig controlCenterConfig, @TopicStoreModule.TriggerActionsStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerActions, Controlcenter.TriggerActions> store, @TopicStoreModule.TriggerEventsStore TopicStoreMaster.Store<Bytes, Controlcenter.VerifiableTriggerInfo, Controlcenter.VerifiableTriggerInfo> store2, @SerializationModule.TriggerActionsKeySerde OrderedKeyUberSerde<Controlcenter.TriggerEventKey> orderedKeyUberSerde, @CommandModule.TriggerConfigs Provider<ReadOnlyKeyValueStore<Command.CommandKey, Command.CommandMessage>> provider, @CommandModule.ActionConfigs Provider<ReadOnlyKeyValueStore<Command.CommandKey, Command.CommandMessage>> provider2, Clock clock) {
        return new AlertTransformerSupplier(store.name, store2.name, controlCenterConfig.getInt(ControlCenterConfig.CONTROL_CENTER_MAX_TRIGGER_EVENTS_PER_ALERT_CONFIG).intValue(), clock, orderedKeyUberSerde, provider, provider2);
    }

    @Inject
    @Singleton
    @Provides
    protected AlertHistoryProcessorSupplier provideAlertHistoryProcessorSupplier(@TopicStoreModule.AlertHistoryStore TopicStoreMaster.Store<Bytes, Alert.AlertInfo, Alert.AlertInfo> store, @SerializationModule.TriggerEventsKeySerde OrderedKeyUberSerde<Alert.AlertInfo> orderedKeyUberSerde, @SerializationModule.TriggerEventsKeyPrefixSerde OrderedKeyUberSerde<Alert.AlertInfo> orderedKeyUberSerde2) {
        return new AlertHistoryProcessorSupplier(store.name, orderedKeyUberSerde, orderedKeyUberSerde2);
    }

    @Inject
    @Singleton
    @Provides
    protected Clock provideClock() {
        return new SystemClock();
    }

    @Singleton
    @Inject
    @Provides
    protected MonitoringHeartbeatSender provideMonitoringHeartbeatSender(ControlCenterConfig controlCenterConfig, StreamsConfig streamsConfig, UberSerde<Monitoring.MonitoringMessage> uberSerde, @TopicStoreModule.ExpectedGroupConsumptionAggregateTopic TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> topic, @TopicStoreModule.ActualGroupConsumptionAggregateTopic TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> topic2, Clock clock) {
        Monitoring.MonitoringMessage provideBaseHeartbeatMonitoringMessage = provideBaseHeartbeatMonitoringMessage(controlCenterConfig.getString(ControlCenterConfig.CONTROL_CENTER_ID_CONFIG));
        Map producerConfigs = streamsConfig.getProducerConfigs("confluent-control-center-heartbeat-sender-" + provideBaseHeartbeatMonitoringMessage.getClientId());
        producerConfigs.putAll(MonitoringProducerDefaults.PRODUCER_CONFIG_DEFAULTS);
        producerConfigs.put(ControlCenterConfig.CONTROL_CENTER_BOOTSTRAP_SERVERS_CONFIG, controlCenterConfig.getList(ControlCenterConfig.CONTROL_CENTER_BOOTSTRAP_SERVERS_CONFIG));
        KafkaProducer kafkaProducer = new KafkaProducer(producerConfigs);
        HashSet hashSet = new HashSet();
        hashSet.add(topic);
        hashSet.add(topic2);
        return new MonitoringHeartbeatSender(controlCenterConfig.getString(ControlCenterConfig.CONTROL_CENTER_TOPIC_CONFIG), hashSet, provideBaseHeartbeatMonitoringMessage, kafkaProducer, uberSerde, TimeBucket.SIZE, clock);
    }

    @Inject
    @Provides
    @C3StreamsMetrics
    protected Metrics provideC3StreamsMetrics(StreamProgressReporter streamProgressReporter) {
        MetricConfig timeWindow = new MetricConfig().samples(30).timeWindow(1L, TimeUnit.SECONDS);
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new JmxReporter("c3-streams"));
        newArrayList.add(streamProgressReporter);
        return new Metrics(timeWindow, newArrayList, new SystemTime());
    }

    public static Monitoring.MonitoringMessage provideBaseHeartbeatMonitoringMessage(String str) {
        return Monitoring.MonitoringMessage.newBuilder(MonitoringMessageUtil.baseMonitoringMessage()).setClientType(Monitoring.ClientType.CONTROLCENTER).setClientId(str).setType(Monitoring.MessageType.HEARTBEAT).setWindow(0L).setTopic("").buildPartial();
    }
}
