/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams.group;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.confluent.controlcenter.keys.Keys;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.serialization.OrderedKeyPrefixedSerdeSupplier;
import io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension;
import io.confluent.controlcenter.streams.C3Stream;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.TypeMapper;
import io.confluent.controlcenter.streams.group.DuplicateWindowedGroupingPredicate;
import io.confluent.controlcenter.streams.group.WindowedGroupingAggregator;
import io.confluent.controlcenter.streams.group.WindowedGroupingInitializer;
import io.confluent.controlcenter.streams.group.WindowedGroupingTypeMapper;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.common.SystemClock;
import io.confluent.monitoring.record.Monitoring;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;

public class GroupStreamExtension
extends AbstractMonitoringMessageRollupStreamExtension<Controlcenter.WindowedGrouping> {
    private final TopicStoreMaster.Store<Bytes, Controlcenter.WindowedGrouping, Controlcenter.WindowedGrouping> groupStore;
    private final TopicStoreMaster.Topic<Bytes, Controlcenter.WindowedGrouping, Void, Controlcenter.WindowedGrouping> groupStreamExtentionTopic;
    private final OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier;
    private final Clock clock;

    @Inject
    public GroupStreamExtension(@TopicStoreModule.GroupStore TopicStoreMaster.Store<Bytes, Controlcenter.WindowedGrouping, Controlcenter.WindowedGrouping> groupStore, @TopicStoreModule.GroupStreamExtensionTopic TopicStoreMaster.Topic<Bytes, Controlcenter.WindowedGrouping, Void, Controlcenter.WindowedGrouping> groupStreamExtentionTopic, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier) {
        this.groupStore = groupStore;
        this.groupStreamExtentionTopic = groupStreamExtentionTopic;
        this.orderedKeyPrefixedSerdeSupplier = orderedKeyPrefixedSerdeSupplier;
        this.clock = new SystemClock();
    }

    protected GroupStreamExtension(@TopicStoreModule.GroupStore TopicStoreMaster.Store<Bytes, Controlcenter.WindowedGrouping, Controlcenter.WindowedGrouping> groupStore, @TopicStoreModule.GroupStreamExtensionTopic TopicStoreMaster.Topic<Bytes, Controlcenter.WindowedGrouping, Void, Controlcenter.WindowedGrouping> groupStreamExtentionTopic, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier, Clock clock) {
        this.groupStore = groupStore;
        this.groupStreamExtentionTopic = groupStreamExtentionTopic;
        this.orderedKeyPrefixedSerdeSupplier = orderedKeyPrefixedSerdeSupplier;
        this.clock = clock;
    }

    @Override
    protected String name() {
        return this.groupStore.name;
    }

    @Override
    protected Iterable<? extends TypeMapper<Void, Controlcenter.WindowedGrouping>> getMappers() {
        return ImmutableList.of((Object)new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_CLIENTIDS_IN_GROUP)), (Object)new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_TOPICPARTITIONS_IN_CLIENT)), (Object)new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_GROUPS_IN_CLIENTTYPE)), (Object)new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_TOPICS_IN_CLIENTTYPE)), (Object)new WindowedGroupingTypeMapper(this.orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.INFO_TOPICPARTITIONS_IN_GROUP)));
    }

    @Override
    protected KTable<Windowed<Bytes>, Controlcenter.WindowedGrouping> filterAndAggregateRollup(KStream<Bytes, Controlcenter.WindowedGrouping> stream, final Windows<Window> windows, String name) {
        return stream.filter((Predicate)new Predicate<Bytes, Controlcenter.WindowedGrouping>(){

            public boolean test(Bytes key, Controlcenter.WindowedGrouping value) {
                return value.getWindow() + windows.maintainMs() > GroupStreamExtension.this.clock.currentTimeMillis();
            }
        }).groupByKey(Serialized.with(this.groupStore.keySerde, this.groupStore.valueSerde)).windowedBy(windows).aggregate((Initializer)new WindowedGroupingInitializer(), (Aggregator)new WindowedGroupingAggregator(), Materialized.as((String)name).withKeySerde(this.groupStore.keySerde).withValueSerde(this.groupStore.aggregateSerde));
    }

    @Override
    protected C3Stream<Bytes, Controlcenter.WindowedGrouping> shuffle(C3Stream<Bytes, Controlcenter.WindowedGrouping> stream) {
        return stream.filterNot(new DuplicateWindowedGroupingPredicate()).through(this.groupStreamExtentionTopic);
    }
}

