package io.confluent.controlcenter.streams.group;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.confluent.controlcenter.keys.Keys;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.serialization.OrderedKeyPrefixedSerdeSupplier;
import io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension;
import io.confluent.controlcenter.streams.C3Stream;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.TypeMapper;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.common.SystemClock;
import io.confluent.monitoring.record.Monitoring;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;

/* loaded from: input_file:io/confluent/controlcenter/streams/group/GroupStreamExtension.class */
public class GroupStreamExtension extends AbstractMonitoringMessageRollupStreamExtension<Controlcenter.WindowedGrouping> {
    private final TopicStoreMaster.Store<Bytes, Controlcenter.WindowedGrouping, Controlcenter.WindowedGrouping> groupStore;
    private final TopicStoreMaster.Topic<Bytes, Controlcenter.WindowedGrouping, Void, Controlcenter.WindowedGrouping> groupStreamExtentionTopic;
    private final OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier;
    private final Clock clock;

    @Inject
    public GroupStreamExtension(@TopicStoreModule.GroupStore TopicStoreMaster.Store<Bytes, Controlcenter.WindowedGrouping, Controlcenter.WindowedGrouping> store, @TopicStoreModule.GroupStreamExtensionTopic TopicStoreMaster.Topic<Bytes, Controlcenter.WindowedGrouping, Void, Controlcenter.WindowedGrouping> topic, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier) {
        this.groupStore = store;
        this.groupStreamExtentionTopic = topic;
        this.orderedKeyPrefixedSerdeSupplier = orderedKeyPrefixedSerdeSupplier;
        this.clock = new SystemClock();
    }

    protected GroupStreamExtension(@TopicStoreModule.GroupStore TopicStoreMaster.Store<Bytes, Controlcenter.WindowedGrouping, Controlcenter.WindowedGrouping> store, @TopicStoreModule.GroupStreamExtensionTopic TopicStoreMaster.Topic<Bytes, Controlcenter.WindowedGrouping, Void, Controlcenter.WindowedGrouping> topic, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier, Clock clock) {
        this.groupStore = store;
        this.groupStreamExtentionTopic = topic;
        this.orderedKeyPrefixedSerdeSupplier = orderedKeyPrefixedSerdeSupplier;
        this.clock = clock;
    }

    @Override // io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension
    protected String name() {
        return this.groupStore.name;
    }

    @Override // io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension
    protected Iterable<? extends TypeMapper<Void, Controlcenter.WindowedGrouping>> getMappers() {
        return ImmutableList.of(new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_CLIENTIDS_IN_GROUP)), new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_TOPICPARTITIONS_IN_CLIENT)), new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_GROUPS_IN_CLIENTTYPE)), new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_TOPICS_IN_CLIENTTYPE)), new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_TOPICPARTITIONS_IN_GROUP)));
    }

    @Override // io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension
    protected KTable<Windowed<Bytes>, Controlcenter.WindowedGrouping> filterAndAggregateRollup(KStream<Bytes, Controlcenter.WindowedGrouping> kStream, final Windows<Window> windows, String str) {
        return kStream.filter(new Predicate<Bytes, Controlcenter.WindowedGrouping>() { // from class: io.confluent.controlcenter.streams.group.GroupStreamExtension.1
            public boolean test(Bytes bytes, Controlcenter.WindowedGrouping windowedGrouping) {
                return windowedGrouping.getWindow() + windows.maintainMs() > GroupStreamExtension.this.clock.currentTimeMillis();
            }
        }).groupByKey(Serialized.with(this.groupStore.keySerde, this.groupStore.valueSerde)).windowedBy(windows).aggregate(new WindowedGroupingInitializer(), new WindowedGroupingAggregator(), Materialized.as(str).withKeySerde(this.groupStore.keySerde).withValueSerde(this.groupStore.aggregateSerde));
    }

    @Override // io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension
    protected C3Stream<Bytes, Controlcenter.WindowedGrouping> shuffle(C3Stream<Bytes, Controlcenter.WindowedGrouping> c3Stream) {
        return c3Stream.m105filterNot((Predicate<? super Bytes, ? super Controlcenter.WindowedGrouping>) new DuplicateWindowedGroupingPredicate()).through((TopicStoreMaster.Topic<Bytes, Controlcenter.WindowedGrouping, K1, T1>) this.groupStreamExtentionTopic);
    }
}
