/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams.monitoring;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.confluent.controlcenter.keys.Keys;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.serialization.OrderedKeyPrefixedSerdeSupplier;
import io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension;
import io.confluent.controlcenter.streams.C3Stream;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.TypeMapper;
import io.confluent.controlcenter.streams.monitoring.MonitoringMessageAggregator;
import io.confluent.controlcenter.streams.monitoring.MonitoringMessageInitializer;
import io.confluent.controlcenter.streams.monitoring.MonitoringMessageTypeMapper;
import io.confluent.controlcenter.streams.verify.Verifiable;
import io.confluent.monitoring.record.Monitoring;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitoringStreamExtension
extends AbstractMonitoringMessageRollupStreamExtension<Monitoring.MonitoringMessage> {
    private static final Logger log = LoggerFactory.getLogger(MonitoringStreamExtension.class);
    private final Verifiable verifiable;
    private final TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> monitoringMsgExtensionStore;
    private final TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> monitoringStreamStore;
    private final OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier;

    @Inject
    public MonitoringStreamExtension(Verifiable verifiable, @TopicStoreModule.MonitoringMsgExtensionStore TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> monitoringMsgExtensionStore, @TopicStoreModule.MonitoringStreamStore TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> monitoringStreamStore, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier) {
        this.verifiable = verifiable;
        this.monitoringMsgExtensionStore = monitoringMsgExtensionStore;
        this.monitoringStreamStore = monitoringStreamStore;
        this.orderedKeyPrefixedSerdeSupplier = orderedKeyPrefixedSerdeSupplier;
    }

    @Override
    protected String name() {
        return this.monitoringStreamStore.name;
    }

    @Override
    protected Iterable<? extends TypeMapper<Void, Monitoring.MonitoringMessage>> getMappers() {
        return ImmutableList.of((Object)new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE)), (Object)new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_GROUP)), (Object)new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_CLIENT)), (Object)new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_TOPIC)), (Object)new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_TOPICPARTITION)), (Object)new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_CLIENT_TOPIC_PARTITION)), (Object)new MonitoringMessageTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.LOOKUP_CLIENTTYPE_GROUP_TOPICPARTITION)));
    }

    @Override
    protected KTable<Windowed<Bytes>, Monitoring.MonitoringMessage> filterAndAggregateRollup(KStream<Bytes, Monitoring.MonitoringMessage> stream, Windows<Window> windows, String name) {
        return stream.groupByKey(Serialized.with(this.monitoringStreamStore.keySerde, this.monitoringStreamStore.valueSerde)).windowedBy(windows).aggregate((Initializer)new MonitoringMessageInitializer(), MonitoringMessageAggregator.stripped(), Materialized.as((String)name).withKeySerde(this.monitoringStreamStore.keySerde).withValueSerde(this.monitoringStreamStore.aggregateSerde));
    }

    @Override
    protected C3Stream<Bytes, Monitoring.MonitoringMessage> shuffle(C3Stream<Bytes, Monitoring.MonitoringMessage> stream) {
        return this.verifiable.transform(stream, this.orderedKeyPrefixedSerdeSupplier.get(), this.monitoringMsgExtensionStore);
    }
}

