/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams.consumergroups;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.confluent.controlcenter.ControlCenterProtoHelper;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.keys.Keys;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.serialization.OrderedKeyPrefixedSerdeSupplier;
import io.confluent.controlcenter.streams.C3Stream;
import io.confluent.controlcenter.streams.StreamExtension;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.consumergroups.ConsumerGroupProductionAggregator;
import io.confluent.controlcenter.streams.consumergroups.ConsumerGroupProductionInitializer;
import io.confluent.controlcenter.streams.monitoring.MonitoringMessageAggregator;
import io.confluent.controlcenter.streams.monitoring.MonitoringMessageInitializer;
import io.confluent.controlcenter.streams.verify.Verifiable;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.OrderedKeyPrefixedSerde;
import io.confluent.serializers.UberSerde;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerGroupStreamExtension
implements StreamExtension<Monitoring.MonitoringMessage, Void, Monitoring.MonitoringMessage> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerGroupStreamExtension.class);
    private final Verifiable verifiable;
    private final TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> consumerGroupAggTpStore;
    private final TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> consumerGroupGroupAggStore;
    private final TopicStoreMaster.Store<Controlcenter.TopicPartition, Monitoring.MonitoringMessage, Controlcenter.ClientGroupProductionAggregate> aggregatedTopicPartitionTableWindowsStore;
    private final TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> monitoringMessageAggregatorWindowsStore;
    private final UberSerde<Controlcenter.TopicPartition> topicPartitionUberSerde;
    private final OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> keySerdeSupplier;

    @Inject
    public ConsumerGroupStreamExtension(Verifiable verifiable, @TopicStoreModule.ConsumerGroupAggTpStore TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> consumerGroupAggTpStore, @TopicStoreModule.ConsumerGroupGroupAggStore TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> consumerGroupGroupAggStore, @TopicStoreModule.AggregatedTopicPartitionTableWindowsStore TopicStoreMaster.Store<Controlcenter.TopicPartition, Monitoring.MonitoringMessage, Controlcenter.ClientGroupProductionAggregate> aggregatedTopicPartitionTableWindowsStore, @TopicStoreModule.MonitoringMessageAggregatorWindowsStore TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> monitoringMessageAggregatorWindowsStore, UberSerde<Controlcenter.TopicPartition> topicPartitionUberSerde, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> keySerdeSupplier) {
        this.verifiable = verifiable;
        this.consumerGroupAggTpStore = consumerGroupAggTpStore;
        this.consumerGroupGroupAggStore = consumerGroupGroupAggStore;
        this.aggregatedTopicPartitionTableWindowsStore = aggregatedTopicPartitionTableWindowsStore;
        this.monitoringMessageAggregatorWindowsStore = monitoringMessageAggregatorWindowsStore;
        this.topicPartitionUberSerde = topicPartitionUberSerde;
        this.keySerdeSupplier = keySerdeSupplier;
    }

    public static Windows<TimeWindow> windows(Rollup rollup) {
        return TimeWindows.of((long)rollup.getMillis()).until(rollup.getRetainMillis());
    }

    public static String stageName(Rollup rollup, String stage) {
        return TopicStoreMaster.nameJoin(stage, rollup.name());
    }

    @Override
    public Map<Rollup, KTable<Windowed<Bytes>, Monitoring.MonitoringMessage>> extend(C3Stream<Void, Monitoring.MonitoringMessage> stream) {
        C3Stream<Controlcenter.TopicPartition, Monitoring.MonitoringMessage> monitoringMessagesByTopicPartition = this.rekeyMessagesByTopicPartition(stream);
        HashMap rollupAggregates = Maps.newHashMap();
        for (Rollup rollup : Rollup.values()) {
            C3Stream<Windowed<Controlcenter.TopicPartition>, Controlcenter.ClientGroupProductionAggregate> clientGroupProductionAggregates = this.clientGroupProductionAggregateStream(rollup, monitoringMessagesByTopicPartition);
            C3Stream<Bytes, Monitoring.MonitoringMessage> messageDiffsByTopicPartititionClientGroup = this.extractDiffsAndRekeyByTopicPartitionClientGroup(rollup, clientGroupProductionAggregates);
            rollupAggregates.put(rollup, this.aggregateAndStoreFinalData(rollup, messageDiffsByTopicPartititionClientGroup));
        }
        return rollupAggregates;
    }

    private C3Stream<Controlcenter.TopicPartition, Monitoring.MonitoringMessage> rekeyMessagesByTopicPartition(C3Stream<Void, Monitoring.MonitoringMessage> input) {
        KStream rekeyed = input.map((KeyValueMapper)new KeyValueMapper<Void, Monitoring.MonitoringMessage, KeyValue<Controlcenter.TopicPartition, Monitoring.MonitoringMessage>>(){

            public KeyValue<Controlcenter.TopicPartition, Monitoring.MonitoringMessage> apply(Void key, Monitoring.MonitoringMessage value) {
                return KeyValue.pair((Object)ControlCenterProtoHelper.topicPartition(value), (Object)value);
            }
        });
        return this.verifiable.transform(rekeyed, this.topicPartitionUberSerde, this.consumerGroupAggTpStore);
    }

    private C3Stream<Windowed<Controlcenter.TopicPartition>, Controlcenter.ClientGroupProductionAggregate> clientGroupProductionAggregateStream(Rollup rollup, C3Stream<Controlcenter.TopicPartition, Monitoring.MonitoringMessage> input) {
        Windows<TimeWindow> windows = ConsumerGroupStreamExtension.windows(rollup);
        String stageName = ConsumerGroupStreamExtension.stageName(rollup, this.aggregatedTopicPartitionTableWindowsStore.name);
        WindowBytesStoreSupplier supplier = Stores.persistentWindowStore((String)stageName, (long)windows.maintainMs(), (int)windows.segments, (long)windows.size(), (boolean)false);
        Materialized as = Materialized.as((WindowBytesStoreSupplier)supplier);
        return input.wrap(input.groupByKey((Serialized<Controlcenter.TopicPartition, Monitoring.MonitoringMessage>)Serialized.with(this.aggregatedTopicPartitionTableWindowsStore.keySerde, this.aggregatedTopicPartitionTableWindowsStore.valueSerde)).windowedBy(windows).aggregate((Initializer)new ConsumerGroupProductionInitializer(), new ConsumerGroupProductionAggregator(), as.withKeySerde(this.aggregatedTopicPartitionTableWindowsStore.keySerde).withValueSerde(this.aggregatedTopicPartitionTableWindowsStore.aggregateSerde).withCachingDisabled()).toStream());
    }

    private C3Stream<Bytes, Monitoring.MonitoringMessage> extractDiffsAndRekeyByTopicPartitionClientGroup(Rollup rollup, C3Stream<Windowed<Controlcenter.TopicPartition>, Controlcenter.ClientGroupProductionAggregate> aggregates) {
        KStream diffs = aggregates.filter(new RekeyPredicate()).flatMap(new ExpectedConsumptionRekeyMapper(this.keySerdeSupplier));
        return this.verifiable.transform(diffs, this.keySerdeSupplier.get(), this.consumerGroupGroupAggStore, rollup);
    }

    private KTable<Windowed<Bytes>, Monitoring.MonitoringMessage> aggregateAndStoreFinalData(Rollup rollup, KStream<Bytes, Monitoring.MonitoringMessage> out) {
        Windows<TimeWindow> windows = ConsumerGroupStreamExtension.windows(rollup);
        WindowBytesStoreSupplier supplier = Stores.persistentWindowStore((String)ConsumerGroupStreamExtension.stageName(rollup, this.monitoringMessageAggregatorWindowsStore.name), (long)windows.maintainMs(), (int)windows.segments, (long)windows.size(), (boolean)false);
        Materialized as = Materialized.as((WindowBytesStoreSupplier)supplier);
        return out.groupByKey(Serialized.with(this.monitoringMessageAggregatorWindowsStore.keySerde, this.monitoringMessageAggregatorWindowsStore.valueSerde)).windowedBy(windows).aggregate((Initializer)new MonitoringMessageInitializer(), MonitoringMessageAggregator.stripped(), as.withValueSerde(this.monitoringMessageAggregatorWindowsStore.aggregateSerde));
    }

    public static class ExpectedConsumptionRekeyMapper
    implements KeyValueMapper<Windowed<Controlcenter.TopicPartition>, Controlcenter.ClientGroupProductionAggregate, Iterable<KeyValue<Bytes, Monitoring.MonitoringMessage>>> {
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionGroupSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionGroupClientTopicPartitionSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionClientSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionInTopicSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionInTopicPartitionForGroupSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionInTopicPartitionForClientSerde;

        public ExpectedConsumptionRekeyMapper(OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> keySerdeSupplier) {
            this.expectedProductionSerde = keySerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION);
            this.expectedProductionGroupSerde = keySerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_GROUP);
            this.expectedProductionGroupClientTopicPartitionSerde = keySerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_GROUP_CLIENT_TOPIC_PARTITION);
            this.expectedProductionClientSerde = keySerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_CLIENT);
            this.expectedProductionInTopicSerde = keySerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_IN_TOPIC);
            this.expectedProductionInTopicPartitionForGroupSerde = keySerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_IN_TOPICPARTITION_FOR_GROUP);
            this.expectedProductionInTopicPartitionForClientSerde = keySerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_IN_TOPICPARTITION_FOR_CLIENT);
        }

        public Iterable<KeyValue<Bytes, Monitoring.MonitoringMessage>> apply(Windowed<Controlcenter.TopicPartition> windowedKey, Controlcenter.ClientGroupProductionAggregate value) {
            HashSet usedGroups = Sets.newHashSet();
            ArrayList tpcgs = Lists.newArrayList();
            Controlcenter.TopicPartition key = (Controlcenter.TopicPartition)windowedKey.key();
            if (value.hasDiff() && value.getDiff().getWindow() > -1L && value.getClientGroupList() != null && value.getClientGroupCount() > 0) {
                Controlcenter.ClientGroup newClientGroup = value.getNewClientGroup();
                for (Controlcenter.ClientGroup cg : value.getClientGroupList()) {
                    if (ConsumerGroupProductionAggregator.clientGroupHasValues(newClientGroup, cg.getGroup(), cg.getClient())) continue;
                    tpcgs.add(KeyValue.pair((Object)ControlCenterProtoHelper.topicPartition(key, cg), (Object)value.getDiff()));
                }
            }
            HashMap localAggregate = Maps.newHashMap();
            ArrayList monitoringMessages = Lists.newArrayList();
            for (KeyValue kv : tpcgs) {
                Controlcenter.ClientGroup clientGroup = ((Controlcenter.TopicPartitionClientGroup)kv.key).getClientGroup();
                Controlcenter.TopicPartition topicPartition = ((Controlcenter.TopicPartitionClientGroup)kv.key).getTopicPartition();
                Monitoring.MonitoringMessage mm = (Monitoring.MonitoringMessage)kv.value;
                monitoringMessages.addAll(this.consumerData(clientGroup, topicPartition, mm));
                monitoringMessages.addAll(this.consumerTopicData(clientGroup, topicPartition, mm));
                if (!usedGroups.add(clientGroup.getGroup())) continue;
                monitoringMessages.addAll(this.consumerGroupData(clientGroup, topicPartition, mm));
                monitoringMessages.addAll(this.aggregateTopicData(clientGroup, topicPartition, mm));
            }
            if (value.hasNewClientGroup() && value.getNewClientGroup().getWindow() >= 0L) {
                Controlcenter.ClientGroup newClientGroup = value.getNewClientGroup();
                Monitoring.MonitoringMessage agg = value.getAgg();
                monitoringMessages.addAll(this.consumerData(newClientGroup, key, agg));
                monitoringMessages.addAll(this.consumerTopicData(newClientGroup, key, agg));
                for (Controlcenter.ClientGroup cg : value.getClientGroupList()) {
                    if (cg.equals((Object)newClientGroup)) continue;
                    usedGroups.add(cg.getGroup());
                }
                if (usedGroups.add(newClientGroup.getGroup())) {
                    monitoringMessages.addAll(this.consumerGroupData(newClientGroup, key, agg));
                    monitoringMessages.addAll(this.aggregateTopicData(newClientGroup, key, agg));
                }
            }
            MonitoringMessageAggregator mma = new MonitoringMessageAggregator();
            for (KeyValue kv : monitoringMessages) {
                if (!localAggregate.containsKey(kv.key)) {
                    localAggregate.put(kv.key, kv.value);
                    continue;
                }
                localAggregate.put(kv.key, mma.aggregate((Monitoring.MonitoringMessage)kv.value, (Monitoring.MonitoringMessage)localAggregate.get(kv.key)));
            }
            ArrayList aggregatedOutput = Lists.newArrayList();
            for (Map.Entry entry : localAggregate.entrySet()) {
                if (((Monitoring.MonitoringMessage)entry.getValue()).getCount() <= 0L && ((Monitoring.MonitoringMessage)entry.getValue()).getType() != Monitoring.MessageType.ERROR) continue;
                aggregatedOutput.add(KeyValue.pair(entry.getKey(), entry.getValue()));
            }
            return aggregatedOutput;
        }

        public Collection<KeyValue<Bytes, Monitoring.MonitoringMessage>> consumerGroupData(Controlcenter.ClientGroup cg, Controlcenter.TopicPartition tp, Monitoring.MonitoringMessage value) {
            Monitoring.MonitoringMessage keyMm = this.getKeyMm(cg, tp);
            return ImmutableList.of((Object)KeyValue.pair((Object)this.expectedProductionSerde.key((Object)keyMm), (Object)value), (Object)KeyValue.pair((Object)this.expectedProductionGroupSerde.key((Object)keyMm), (Object)value));
        }

        protected Collection<KeyValue<Bytes, Monitoring.MonitoringMessage>> consumerData(Controlcenter.ClientGroup cg, Controlcenter.TopicPartition tp, Monitoring.MonitoringMessage value) {
            Monitoring.MonitoringMessage keyMm = this.getKeyMm(cg, tp);
            return ImmutableList.of((Object)KeyValue.pair((Object)this.expectedProductionGroupClientTopicPartitionSerde.key((Object)keyMm), (Object)value), (Object)KeyValue.pair((Object)this.expectedProductionClientSerde.key((Object)keyMm), (Object)value));
        }

        protected Collection<KeyValue<Bytes, Monitoring.MonitoringMessage>> aggregateTopicData(Controlcenter.ClientGroup cg, Controlcenter.TopicPartition tp, Monitoring.MonitoringMessage value) {
            Monitoring.MonitoringMessage keyMm = this.getKeyMm(cg, tp);
            return ImmutableList.of((Object)KeyValue.pair((Object)this.expectedProductionInTopicSerde.key((Object)keyMm), (Object)value), (Object)KeyValue.pair((Object)this.expectedProductionInTopicPartitionForGroupSerde.key((Object)keyMm), (Object)value));
        }

        protected Collection<KeyValue<Bytes, Monitoring.MonitoringMessage>> consumerTopicData(Controlcenter.ClientGroup cg, Controlcenter.TopicPartition tp, Monitoring.MonitoringMessage value) {
            Monitoring.MonitoringMessage keyMm = this.getKeyMm(cg, tp);
            return ImmutableList.of((Object)KeyValue.pair((Object)this.expectedProductionInTopicPartitionForClientSerde.key((Object)keyMm), (Object)value));
        }

        private Monitoring.MonitoringMessage getKeyMm(Controlcenter.ClientGroup cg, Controlcenter.TopicPartition tp) {
            return Monitoring.MonitoringMessage.newBuilder().setClientType(Monitoring.ClientType.PRODUCER).setClusterId(tp.getClusterId()).setGroup(cg.getGroup()).setClientId(cg.getClient()).setTopic(tp.getTopic()).setPartition(tp.getPartition()).build();
        }
    }

    public static class RekeyPredicate
    implements Predicate<Windowed<Controlcenter.TopicPartition>, Controlcenter.ClientGroupProductionAggregate> {
        public boolean test(Windowed<Controlcenter.TopicPartition> key, Controlcenter.ClientGroupProductionAggregate value) {
            return value.hasNewClientGroup() && value.getNewClientGroup().getWindow() > -1L || value.hasDiff() && value.getDiff().getWindow() > -1L;
        }
    }
}

