package io.confluent.controlcenter.streams;

import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.confluent.command.cluster_metadata.CommandClusterMetadata;
import io.confluent.command.record.Command;
import io.confluent.controlcenter.ControlCenterModule;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.alert.AlertSenderRouter;
import io.confluent.controlcenter.alert.TriggerEventUtil;
import io.confluent.controlcenter.alert.record.Alert;
import io.confluent.controlcenter.command.CommandModule;
import io.confluent.controlcenter.data.KafkaMetadataDao;
import io.confluent.controlcenter.keys.Keys;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.rest.TokenCredential;
import io.confluent.controlcenter.serialization.OrderedKeyPrefixedSerdeSupplier;
import io.confluent.controlcenter.streams.C3Streams;
import io.confluent.controlcenter.streams.StreamsModule;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.aggregation.MetricEvent;
import io.confluent.controlcenter.streams.aggregation.MetricHolder;
import io.confluent.controlcenter.streams.aggregation.MetricValues;
import io.confluent.controlcenter.streams.aggregation.MetricsAggregation;
import io.confluent.controlcenter.streams.aggregation.MetricsExtractor;
import io.confluent.controlcenter.streams.alert.AlertHistoryProcessorSupplier;
import io.confluent.controlcenter.streams.alert.AlertTransformerSupplier;
import io.confluent.controlcenter.streams.alert.GroupMeasurement;
import io.confluent.controlcenter.streams.alert.MetricToTriggerMeasurementTransformerSupplier;
import io.confluent.controlcenter.streams.alert.MonitoringTriggerMeasurementJoiner;
import io.confluent.controlcenter.streams.alert.MonitoringTriggerTransformerSupplier;
import io.confluent.controlcenter.streams.alert.TriggerEventStreamsUtil;
import io.confluent.controlcenter.streams.consumergroups.ConsumerGroupStreamExtension;
import io.confluent.controlcenter.streams.group.GroupStreamExtension;
import io.confluent.controlcenter.streams.monitoring.MonitoringPredicates;
import io.confluent.controlcenter.streams.monitoring.MonitoringStreamExtension;
import io.confluent.controlcenter.streams.verify.MonitoringVerifier;
import io.confluent.controlcenter.streams.verify.VerifyTransformerSupplier;
import io.confluent.controlcenter.util.versions.BrokerVersion;
import io.confluent.controlcenter.util.versions.KafkaVersionTracker;
import io.confluent.metrics.record.ConfluentMetric;
import io.confluent.monitoring.common.MonitoringMessageUtil;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.OrderedKeyPrefixedSerde;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.StoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/StreamBuilderProvider.class */
public class StreamBuilderProvider implements Provider<C3Streams.Builder> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StreamBuilderProvider.class);
    private static final StreamPartitioner<Bytes, Controlcenter.TriggerEvent> TRIGGER_EVENT_PARTITIONER = new StreamPartitioner<Bytes, Controlcenter.TriggerEvent>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.1
        @Override // org.apache.kafka.streams.processor.StreamPartitioner
        public Integer partition(String str, Bytes bytes, Controlcenter.TriggerEvent triggerEvent, int i) {
            return Integer.valueOf(TriggerEventStreamsUtil.partitionFor(bytes, triggerEvent, i));
        }
    };
    private static final StreamPartitioner<Object, Monitoring.MonitoringMessage> MONITORING_MESSAGE_PARTITIONER = new StreamPartitioner<Object, Monitoring.MonitoringMessage>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.2
        @Override // org.apache.kafka.streams.processor.StreamPartitioner
        public Integer partition(String str, Object obj, Monitoring.MonitoringMessage monitoringMessage, int i) {
            return Integer.valueOf(MonitoringMessageUtil.partitionFor(monitoringMessage, i));
        }
    };
    private final MonitoringStreamExtension monitoringStreamExtension;
    private final GroupStreamExtension groupStreamExtension;
    private final ConsumerGroupStreamExtension consumerGroupStreamExtension;
    private final Set<StoreBuilder> storeBuilders;
    private final VerifyTransformerSupplier verifyTransformerSupplier;
    private final TopicStoreMaster.Store<Bytes, MonitoringVerifier, MonitoringVerifier> verifierStore;
    private final TopicStoreMaster.Topic<Void, Monitoring.MonitoringMessage, Void, Void> monitoringTopic;
    private final TopicStoreMaster.Topic<Void, Monitoring.MonitoringMessage, Void, Void> repartitionedMonitoringTopic;
    private final TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> rekeyedExpectedGroupConsumptionTopic;
    private final TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> rekeyedActualGroupConsumptionTopic;
    private final TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Controlcenter.TriggerMeasurement, Void, Void> metricsTriggerMeasurementTopic;
    private final TopicStoreMaster.Topic<Bytes, Controlcenter.TriggerEvent, Void, Void> rekeyedTriggerEventTopic;
    private final TopicStoreMaster.Topic<Void, ConfluentMetric.MetricsMessage, Void, Void> metricsTopic;
    private final TopicStoreMaster.Topic<?, MetricHolder, Void, Void> metricsAggregateRepartitionTopic;
    private final BroadcastHeartbeatTransformerSupplier<Void> broadcastHeartbeatTransformer;
    private final MetricToTriggerMeasurementTransformerSupplier metricToTriggerTransformerSupplier;
    private final MonitoringTriggerTransformerSupplier monitoringTriggerTransformerSupplier;
    private final TopicStoreMaster.Store<Bytes, Controlcenter.TriggerMeasurement, Controlcenter.TriggerMeasurement> monitoringTriggerStore;
    private final TopicStoreMaster.Store<Bytes, Controlcenter.TriggerActions, Controlcenter.TriggerActions> triggerActionsStore;
    private final TopicStoreMaster.Store<Bytes, Controlcenter.VerifiableTriggerInfo, Controlcenter.VerifiableTriggerInfo> triggerEventsStore;
    private final TopicStoreMaster.Store<Bytes, Alert.AlertInfo, Alert.AlertInfo> alertHistoryStore;
    private final TopicStoreMaster.Store<ByteBuffer, MetricHolder, MetricHolder> metricsAggregateStore;
    private final MetricsAggregation metricsAggregation;
    private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> keySerde;
    private final GroupMeasurement groupMeasurement;
    private final AlertTransformerSupplier alertTransformerSupplier;
    private final AlertHistoryProcessorSupplier alertHistoryProcessorSupplier;
    private final KafkaClusterMetadataToCommandMapper kafkaClusterMetadataToCommandMapper;
    private final TopicStoreMaster.Topic<Command.CommandKey, Command.CommandMessage, Void, Void> commandTopic;
    private final LoadingCache<TokenCredential, Map<Integer, Long>> brokerCache;
    private final LoadingCache<TokenCredential, SortedSet<String>> topicCache;
    private final LoadingCache<TokenCredential, Map<String, Long>> topicPartitionCache;
    private final KafkaMetadataDao kafkaMetadataDao;
    private final AlertSenderRouter alertSenderRouter;
    private final KafkaVersionTracker kafkaVersionTracker;
    private final int metricsStoreRetentionDays;

    @Inject
    public StreamBuilderProvider(MonitoringStreamExtension monitoringStreamExtension, GroupStreamExtension groupStreamExtension, ConsumerGroupStreamExtension consumerGroupStreamExtension, Set<StoreBuilder> set, VerifyTransformerSupplier verifyTransformerSupplier, @TopicStoreModule.VerifierStore TopicStoreMaster.Store<Bytes, MonitoringVerifier, MonitoringVerifier> store, @ControlCenterModule.MonitoringTopic TopicStoreMaster.Topic<Void, Monitoring.MonitoringMessage, Void, Void> topic, @ControlCenterModule.MetricsTopic TopicStoreMaster.Topic<Void, ConfluentMetric.MetricsMessage, Void, Void> topic2, @TopicStoreModule.ReKeyedMonitoringTopic TopicStoreMaster.Topic<Void, Monitoring.MonitoringMessage, Void, Void> topic3, @TopicStoreModule.ExpectedGroupConsumptionAggregateTopic TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> topic4, @TopicStoreModule.ActualGroupConsumptionAggregateTopic TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> topic5, BroadcastHeartbeatTransformerSupplier<Void> broadcastHeartbeatTransformerSupplier, MonitoringTriggerTransformerSupplier monitoringTriggerTransformerSupplier, MetricToTriggerMeasurementTransformerSupplier metricToTriggerMeasurementTransformerSupplier, @TopicStoreModule.MonitoringTriggerStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerMeasurement, Controlcenter.TriggerMeasurement> store2, @TopicStoreModule.TriggerActionsStore TopicStoreMaster.Store<Bytes, Controlcenter.TriggerActions, Controlcenter.TriggerActions> store3, @TopicStoreModule.TriggerEventsStore TopicStoreMaster.Store<Bytes, Controlcenter.VerifiableTriggerInfo, Controlcenter.VerifiableTriggerInfo> store4, @TopicStoreModule.AlertHistoryStore TopicStoreMaster.Store<Bytes, Alert.AlertInfo, Alert.AlertInfo> store5, @TopicStoreModule.MetricsAggregateStore TopicStoreMaster.Store<ByteBuffer, MetricHolder, MetricHolder> store6, @TopicStoreModule.MetricsAggregateRepartitionTopic TopicStoreMaster.Topic<?, MetricHolder, Void, Void> topic6, MetricsAggregation metricsAggregation, AlertTransformerSupplier alertTransformerSupplier, AlertHistoryProcessorSupplier alertHistoryProcessorSupplier, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier, GroupMeasurement groupMeasurement, @TopicStoreModule.ReKeyedTriggerEventTopic TopicStoreMaster.Topic<Bytes, Controlcenter.TriggerEvent, Void, Void> topic7, TopicStoreMaster topicStoreMaster, @TopicStoreModule.MetricsTriggerMeasurementTopic TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Controlcenter.TriggerMeasurement, Void, Void> topic8, KafkaClusterMetadataToCommandMapper kafkaClusterMetadataToCommandMapper, @CommandModule.CommandTopic TopicStoreMaster.Topic<Command.CommandKey, Command.CommandMessage, Void, Void> topic9, @ControlCenterModule.BrokerCache LoadingCache<TokenCredential, Map<Integer, Long>> loadingCache, @ControlCenterModule.TopicCache LoadingCache<TokenCredential, SortedSet<String>> loadingCache2, @StreamsModule.TopicPartitionCache LoadingCache<TokenCredential, Map<String, Long>> loadingCache3, KafkaMetadataDao kafkaMetadataDao, AlertSenderRouter alertSenderRouter, KafkaVersionTracker kafkaVersionTracker, @ControlCenterModule.ControlCenterMetricsStoreRetentionDays int i) {
        this.monitoringStreamExtension = monitoringStreamExtension;
        this.groupStreamExtension = groupStreamExtension;
        this.consumerGroupStreamExtension = consumerGroupStreamExtension;
        this.storeBuilders = set;
        this.verifyTransformerSupplier = verifyTransformerSupplier;
        this.broadcastHeartbeatTransformer = broadcastHeartbeatTransformerSupplier;
        this.monitoringTopic = topic;
        this.metricsTopic = topic2;
        this.metricsAggregateRepartitionTopic = topic6;
        this.repartitionedMonitoringTopic = topic3;
        this.metricsAggregation = metricsAggregation;
        this.verifierStore = store;
        this.rekeyedActualGroupConsumptionTopic = topic5;
        this.rekeyedExpectedGroupConsumptionTopic = topic4;
        this.metricToTriggerTransformerSupplier = metricToTriggerMeasurementTransformerSupplier;
        this.monitoringTriggerTransformerSupplier = monitoringTriggerTransformerSupplier;
        this.monitoringTriggerStore = store2;
        this.alertTransformerSupplier = alertTransformerSupplier;
        this.alertHistoryProcessorSupplier = alertHistoryProcessorSupplier;
        this.triggerActionsStore = store3;
        this.triggerEventsStore = store4;
        this.alertHistoryStore = store5;
        this.metricsAggregateStore = store6;
        this.keySerde = orderedKeyPrefixedSerdeSupplier.get();
        this.groupMeasurement = groupMeasurement;
        this.rekeyedTriggerEventTopic = topic7;
        this.metricsTriggerMeasurementTopic = topic8;
        this.kafkaClusterMetadataToCommandMapper = kafkaClusterMetadataToCommandMapper;
        this.commandTopic = topic9;
        this.brokerCache = loadingCache;
        this.topicCache = loadingCache2;
        this.topicPartitionCache = loadingCache3;
        this.kafkaMetadataDao = kafkaMetadataDao;
        this.alertSenderRouter = alertSenderRouter;
        this.kafkaVersionTracker = kafkaVersionTracker;
        this.metricsStoreRetentionDays = Math.max(1, i);
    }

    @Override // com.google.inject.Provider, javax.inject.Provider
    public C3Streams.Builder get() {
        try {
            C3Streams.Builder builder = C3Streams.builder();
            C3Stream[] branch = builder.withStoreBuilders(this.storeBuilders).stream(this.monitoringTopic).transformValues((ValueTransformerSupplier) new ValueTransformerSupplier<Monitoring.MonitoringMessage, Monitoring.MonitoringMessage>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.3
                @Override // org.apache.kafka.streams.kstream.ValueTransformerSupplier
                public ValueTransformer<Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> get() {
                    return new InjectMonitoringPartitionTransformer();
                }
            }, new String[0]).transform((TransformerSupplier) this.broadcastHeartbeatTransformer, new String[0]).flatMap(Identity.toFlatten()).through(MONITORING_MESSAGE_PARTITIONER, this.repartitionedMonitoringTopic).transform((TransformerSupplier) this.verifyTransformerSupplier, this.verifierStore.name).flatMap(Identity.toFlatten()).branch(MonitoringPredicates.isNormal(), MonitoringPredicates.isError());
            C3Stream<Void, Monitoring.MonitoringMessage> merge = builder.merge(branch[0].filter(MonitoringPredicates.hasWindow()), branch[1].transform((TransformerSupplier) new TransformerSupplier<Void, Monitoring.MonitoringMessage, KeyValue<Void, Monitoring.MonitoringMessage>>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.4
                @Override // org.apache.kafka.streams.kstream.TransformerSupplier, java.util.function.Supplier
                public Transformer<Void, Monitoring.MonitoringMessage, KeyValue<Void, Monitoring.MonitoringMessage>> get() {
                    return new Transformer<Void, Monitoring.MonitoringMessage, KeyValue<Void, Monitoring.MonitoringMessage>>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.4.1
                        ProcessorContext processorContext;

                        @Override // org.apache.kafka.streams.kstream.Transformer
                        public void init(ProcessorContext processorContext) {
                            this.processorContext = processorContext;
                        }

                        @Override // org.apache.kafka.streams.kstream.Transformer
                        public KeyValue<Void, Monitoring.MonitoringMessage> transform(Void r8, Monitoring.MonitoringMessage monitoringMessage) {
                            this.processorContext.forward(r8, monitoringMessage, To.all().withTimestamp(monitoringMessage.getWindow()));
                            return KeyValue.pair(r8, monitoringMessage);
                        }

                        @Override // org.apache.kafka.streams.kstream.Transformer
                        public void close() {
                        }
                    };
                }
            }, new String[0]));
            Map<Rollup, KTable<Windowed<Bytes>, Monitoring.MonitoringMessage>> extend = this.consumerGroupStreamExtension.extend(merge);
            Map<Rollup, KTable<Windowed<Bytes>, Monitoring.MonitoringMessage>> extend2 = this.monitoringStreamExtension.extend(merge);
            this.groupStreamExtension.extend(merge);
            merge.mapValues((ValueMapper<? super Monitoring.MonitoringMessage, ? extends V1>) new ValueMapper<Monitoring.MonitoringMessage, CommandClusterMetadata.KafkaClusterMetadata>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.5
                @Override // org.apache.kafka.streams.kstream.ValueMapper
                public CommandClusterMetadata.KafkaClusterMetadata apply(Monitoring.MonitoringMessage monitoringMessage) {
                    return CommandClusterMetadata.KafkaClusterMetadata.newBuilder().setClusterId(monitoringMessage.getClusterId()).build();
                }
            }).map((KeyValueMapper) this.kafkaClusterMetadataToCommandMapper).filter(Predicates.keyNotNull()).to(this.commandTopic);
            C3Stream through = new C3Stream(extend.get(Rollup.ONE_MINUTE).filter(this.groupMeasurement).toStream(new KeyValueMapper<Windowed<Bytes>, Monitoring.MonitoringMessage, Controlcenter.WindowedClusterGroup>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.6
                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.kafka.streams.kstream.KeyValueMapper
                public Controlcenter.WindowedClusterGroup apply(Windowed<Bytes> windowed, Monitoring.MonitoringMessage monitoringMessage) {
                    if (monitoringMessage == null) {
                        return Controlcenter.WindowedClusterGroup.newBuilder().setWindow(windowed.window().start()).build();
                    }
                    Monitoring.MonitoringMessage monitoringMessage2 = (Monitoring.MonitoringMessage) StreamBuilderProvider.this.keySerde.toProto(windowed.key());
                    return Controlcenter.WindowedClusterGroup.newBuilder().setClusterId(monitoringMessage2.getClusterId()).setComponent(monitoringMessage2.getGroup()).setMetricName(TriggerEventUtil.CLIENT_MONITORING_METRIC_NAME).setWindow(windowed.window().start()).build();
                }
            }), builder).filter(Predicates.valueNotNull()).through(this.rekeyedExpectedGroupConsumptionTopic);
            C3Stream through2 = new C3Stream(extend2.get(Rollup.ONE_MINUTE).filter(this.groupMeasurement).toStream(new KeyValueMapper<Windowed<Bytes>, Monitoring.MonitoringMessage, Controlcenter.WindowedClusterGroup>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.7
                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.kafka.streams.kstream.KeyValueMapper
                public Controlcenter.WindowedClusterGroup apply(Windowed<Bytes> windowed, Monitoring.MonitoringMessage monitoringMessage) {
                    if (monitoringMessage == null) {
                        return Controlcenter.WindowedClusterGroup.newBuilder().setWindow(windowed.window().start()).build();
                    }
                    Monitoring.MonitoringMessage monitoringMessage2 = (Monitoring.MonitoringMessage) StreamBuilderProvider.this.keySerde.toProto(windowed.key());
                    return Controlcenter.WindowedClusterGroup.newBuilder().setClusterId(monitoringMessage2.getClusterId()).setComponent(monitoringMessage2.getGroup()).setMetricName(TriggerEventUtil.CLIENT_MONITORING_METRIC_NAME).setWindow(windowed.window().start()).build();
                }
            }), builder).filter(Predicates.valueNotNull()).through(this.rekeyedActualGroupConsumptionTopic);
            C3Stream filter = builder.stream(this.metricsTopic).filter((Predicate) new Predicate<Void, ConfluentMetric.MetricsMessage>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.8
                @Override // org.apache.kafka.streams.kstream.Predicate
                public boolean test(Void r4, ConfluentMetric.MetricsMessage metricsMessage) {
                    return metricsMessage.getMetricType() == ConfluentMetric.MetricType.BROKER;
                }
            });
            filter.mapValues((ValueMapper) new ValueMapper<ConfluentMetric.MetricsMessage, CommandClusterMetadata.KafkaClusterMetadata>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.9
                @Override // org.apache.kafka.streams.kstream.ValueMapper
                public CommandClusterMetadata.KafkaClusterMetadata apply(ConfluentMetric.MetricsMessage metricsMessage) {
                    return CommandClusterMetadata.KafkaClusterMetadata.newBuilder().setClusterId(metricsMessage.getClusterId()).build();
                }
            }).map((KeyValueMapper) this.kafkaClusterMetadataToCommandMapper).filter(Predicates.keyNotNull()).to(this.commandTopic);
            filter.foreach((r9, metricsMessage) -> {
                try {
                    if (metricsMessage.getClusterId() != null && metricsMessage.getVersion() != null && metricsMessage.getCommitId() != null) {
                        this.kafkaVersionTracker.register(metricsMessage.getClusterId(), Integer.valueOf(metricsMessage.getBrokerId()), new BrokerVersion(metricsMessage.getVersion(), metricsMessage.getCommitId()));
                    }
                } catch (Exception e) {
                    log.debug("Failed to track a Kafka version", (Throwable) e);
                }
            });
            C3Stream flatMap = builder.merge(through2.outerJoin(through, (ValueJoiner) new MonitoringTriggerMeasurementJoiner(), JoinWindows.of(Duration.ofMillis(1000L)), StreamJoined.with(this.rekeyedActualGroupConsumptionTopic.keySerde, this.rekeyedExpectedGroupConsumptionTopic.valueSerde, this.rekeyedActualGroupConsumptionTopic.valueSerde)), new C3Stream(filter.mapValues((ValueMapper) new MetricsExtractor(this.metricsAggregation.metricField(), this.brokerCache, this.topicCache, this.topicPartitionCache)).flatMap((KeyValueMapper) this.metricsAggregation.combiner()).groupBy(this.metricsAggregation.groupingSets(), this.metricsAggregateRepartitionTopic).windowedBy(MetricsAggregation.rollup()).aggregate(this.metricsAggregation.initializer(), this.metricsAggregation.aggregator(), Materialized.as(this.metricsAggregateStore.name).withKeySerde(this.metricsAggregation.groupingSets().keySerde()).withValueSerde(this.metricsAggregateStore.aggregateSerde).withRetention(Duration.ofDays(this.metricsStoreRetentionDays))).toStream().transform(new TransformerSupplier<Windowed<? extends MetricEvent>, MetricValues, KeyValue<Windowed<MetricEvent>, MetricValues>>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.10
                @Override // org.apache.kafka.streams.kstream.TransformerSupplier, java.util.function.Supplier
                public Transformer<Windowed<? extends MetricEvent>, MetricValues, KeyValue<Windowed<MetricEvent>, MetricValues>> get() {
                    return new TransformCompositeMetrics(StreamBuilderProvider.this.metricsAggregation, StreamBuilderProvider.this.metricsAggregateStore, StreamBuilderProvider.this.kafkaMetadataDao);
                }
            }, this.metricsAggregateStore.name), builder).transform((TransformerSupplier) this.metricToTriggerTransformerSupplier, new String[0]).flatMap(Identity.toFlatten()).through(this.metricsTriggerMeasurementTopic)).transform((TransformerSupplier) this.monitoringTriggerTransformerSupplier, this.monitoringTriggerStore.name).flatMap(Identity.toFlatten()).through(TRIGGER_EVENT_PARTITIONER, this.rekeyedTriggerEventTopic).transform((TransformerSupplier) this.alertTransformerSupplier, this.triggerActionsStore.name, this.triggerEventsStore.name).flatMap(Identity.toFlatten());
            flatMap.foreach(new ForeachAction<Alert.AlertInfo, Alert.AlertInfo>() { // from class: io.confluent.controlcenter.streams.StreamBuilderProvider.11
                @Override // org.apache.kafka.streams.kstream.ForeachAction
                public void apply(Alert.AlertInfo alertInfo, Alert.AlertInfo alertInfo2) {
                    StreamBuilderProvider.this.alertSenderRouter.sendWithHistory(alertInfo2);
                }
            });
            flatMap.process(this.alertHistoryProcessorSupplier, this.alertHistoryStore.name);
            return builder;
        } catch (Throwable th) {
            log.error("creating streams", th);
            return null;
        }
    }
}
