package io.confluent.controlcenter.streams.consumergroups;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.confluent.controlcenter.ControlCenterProtoHelper;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.keys.Keys;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.serialization.OrderedKeyPrefixedSerdeSupplier;
import io.confluent.controlcenter.streams.C3Stream;
import io.confluent.controlcenter.streams.StreamExtension;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.monitoring.MonitoringMessageAggregator;
import io.confluent.controlcenter.streams.monitoring.MonitoringMessageInitializer;
import io.confluent.controlcenter.streams.verify.Verifiable;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.OrderedKeyPrefixedSerde;
import io.confluent.serializers.UberSerde;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/consumergroups/ConsumerGroupStreamExtension.class */
public class ConsumerGroupStreamExtension implements StreamExtension<Monitoring.MonitoringMessage, Void, Monitoring.MonitoringMessage> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerGroupStreamExtension.class);
    private final Verifiable verifiable;
    private final TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> consumerGroupAggTpStore;
    private final TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> consumerGroupGroupAggStore;
    private final TopicStoreMaster.Store<Controlcenter.TopicPartition, Monitoring.MonitoringMessage, Controlcenter.ClientGroupProductionAggregate> aggregatedTopicPartitionTableWindowsStore;
    private final TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> monitoringMessageAggregatorWindowsStore;
    private final UberSerde<Controlcenter.TopicPartition> topicPartitionUberSerde;
    private final OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> keySerdeSupplier;

    /* loaded from: input_file:io/confluent/controlcenter/streams/consumergroups/ConsumerGroupStreamExtension$ExpectedConsumptionRekeyMapper.class */
    public static class ExpectedConsumptionRekeyMapper implements KeyValueMapper<Windowed<Controlcenter.TopicPartition>, Controlcenter.ClientGroupProductionAggregate, Iterable<KeyValue<Bytes, Monitoring.MonitoringMessage>>> {
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionGroupSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionGroupClientTopicPartitionSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionClientSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionInTopicSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionInTopicPartitionForGroupSerde;
        private final OrderedKeyPrefixedSerde<Keys.KeyType, Monitoring.MonitoringMessage> expectedProductionInTopicPartitionForClientSerde;

        public ExpectedConsumptionRekeyMapper(OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier) {
            this.expectedProductionSerde = orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION);
            this.expectedProductionGroupSerde = orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_GROUP);
            this.expectedProductionGroupClientTopicPartitionSerde = orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_GROUP_CLIENT_TOPIC_PARTITION);
            this.expectedProductionClientSerde = orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_CLIENT);
            this.expectedProductionInTopicSerde = orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_IN_TOPIC);
            this.expectedProductionInTopicPartitionForGroupSerde = orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_IN_TOPICPARTITION_FOR_GROUP);
            this.expectedProductionInTopicPartitionForClientSerde = orderedKeyPrefixedSerdeSupplier.get(Keys.KeyType.EXPECTED_PRODUCTION_IN_TOPICPARTITION_FOR_CLIENT);
        }

        public Iterable<KeyValue<Bytes, Monitoring.MonitoringMessage>> apply(Windowed<Controlcenter.TopicPartition> windowed, Controlcenter.ClientGroupProductionAggregate clientGroupProductionAggregate) {
            HashSet newHashSet = Sets.newHashSet();
            ArrayList<KeyValue> newArrayList = Lists.newArrayList();
            Controlcenter.TopicPartition topicPartition = (Controlcenter.TopicPartition) windowed.key();
            if (clientGroupProductionAggregate.hasDiff() && clientGroupProductionAggregate.getDiff().getWindow() > -1 && clientGroupProductionAggregate.getClientGroupList() != null && clientGroupProductionAggregate.getClientGroupCount() > 0) {
                Controlcenter.ClientGroup newClientGroup = clientGroupProductionAggregate.getNewClientGroup();
                for (Controlcenter.ClientGroup clientGroup : clientGroupProductionAggregate.getClientGroupList()) {
                    if (!ConsumerGroupProductionAggregator.clientGroupHasValues(newClientGroup, clientGroup.getGroup(), clientGroup.getClient())) {
                        newArrayList.add(KeyValue.pair(ControlCenterProtoHelper.topicPartition(topicPartition, clientGroup), clientGroupProductionAggregate.getDiff()));
                    }
                }
            }
            HashMap newHashMap = Maps.newHashMap();
            ArrayList<KeyValue> newArrayList2 = Lists.newArrayList();
            for (KeyValue keyValue : newArrayList) {
                Controlcenter.ClientGroup clientGroup2 = ((Controlcenter.TopicPartitionClientGroup) keyValue.key).getClientGroup();
                Controlcenter.TopicPartition topicPartition2 = ((Controlcenter.TopicPartitionClientGroup) keyValue.key).getTopicPartition();
                Monitoring.MonitoringMessage monitoringMessage = (Monitoring.MonitoringMessage) keyValue.value;
                newArrayList2.addAll(consumerData(clientGroup2, topicPartition2, monitoringMessage));
                newArrayList2.addAll(consumerTopicData(clientGroup2, topicPartition2, monitoringMessage));
                if (newHashSet.add(clientGroup2.getGroup())) {
                    newArrayList2.addAll(consumerGroupData(clientGroup2, topicPartition2, monitoringMessage));
                    newArrayList2.addAll(aggregateTopicData(clientGroup2, topicPartition2, monitoringMessage));
                }
            }
            if (clientGroupProductionAggregate.hasNewClientGroup() && clientGroupProductionAggregate.getNewClientGroup().getWindow() >= 0) {
                Controlcenter.ClientGroup newClientGroup2 = clientGroupProductionAggregate.getNewClientGroup();
                Monitoring.MonitoringMessage agg = clientGroupProductionAggregate.getAgg();
                newArrayList2.addAll(consumerData(newClientGroup2, topicPartition, agg));
                newArrayList2.addAll(consumerTopicData(newClientGroup2, topicPartition, agg));
                for (Controlcenter.ClientGroup clientGroup3 : clientGroupProductionAggregate.getClientGroupList()) {
                    if (!clientGroup3.equals(newClientGroup2)) {
                        newHashSet.add(clientGroup3.getGroup());
                    }
                }
                if (newHashSet.add(newClientGroup2.getGroup())) {
                    newArrayList2.addAll(consumerGroupData(newClientGroup2, topicPartition, agg));
                    newArrayList2.addAll(aggregateTopicData(newClientGroup2, topicPartition, agg));
                }
            }
            MonitoringMessageAggregator monitoringMessageAggregator = new MonitoringMessageAggregator();
            for (KeyValue keyValue2 : newArrayList2) {
                if (newHashMap.containsKey(keyValue2.key)) {
                    newHashMap.put(keyValue2.key, monitoringMessageAggregator.aggregate((Monitoring.MonitoringMessage) keyValue2.value, (Monitoring.MonitoringMessage) newHashMap.get(keyValue2.key)));
                } else {
                    newHashMap.put(keyValue2.key, keyValue2.value);
                }
            }
            ArrayList newArrayList3 = Lists.newArrayList();
            for (Map.Entry entry : newHashMap.entrySet()) {
                if (((Monitoring.MonitoringMessage) entry.getValue()).getCount() > 0 || ((Monitoring.MonitoringMessage) entry.getValue()).getType() == Monitoring.MessageType.ERROR) {
                    newArrayList3.add(KeyValue.pair(entry.getKey(), entry.getValue()));
                }
            }
            return newArrayList3;
        }

        public Collection<KeyValue<Bytes, Monitoring.MonitoringMessage>> consumerGroupData(Controlcenter.ClientGroup clientGroup, Controlcenter.TopicPartition topicPartition, Monitoring.MonitoringMessage monitoringMessage) {
            Monitoring.MonitoringMessage keyMm = getKeyMm(clientGroup, topicPartition);
            return ImmutableList.of(KeyValue.pair(this.expectedProductionSerde.key(keyMm), monitoringMessage), KeyValue.pair(this.expectedProductionGroupSerde.key(keyMm), monitoringMessage));
        }

        protected Collection<KeyValue<Bytes, Monitoring.MonitoringMessage>> consumerData(Controlcenter.ClientGroup clientGroup, Controlcenter.TopicPartition topicPartition, Monitoring.MonitoringMessage monitoringMessage) {
            Monitoring.MonitoringMessage keyMm = getKeyMm(clientGroup, topicPartition);
            return ImmutableList.of(KeyValue.pair(this.expectedProductionGroupClientTopicPartitionSerde.key(keyMm), monitoringMessage), KeyValue.pair(this.expectedProductionClientSerde.key(keyMm), monitoringMessage));
        }

        protected Collection<KeyValue<Bytes, Monitoring.MonitoringMessage>> aggregateTopicData(Controlcenter.ClientGroup clientGroup, Controlcenter.TopicPartition topicPartition, Monitoring.MonitoringMessage monitoringMessage) {
            Monitoring.MonitoringMessage keyMm = getKeyMm(clientGroup, topicPartition);
            return ImmutableList.of(KeyValue.pair(this.expectedProductionInTopicSerde.key(keyMm), monitoringMessage), KeyValue.pair(this.expectedProductionInTopicPartitionForGroupSerde.key(keyMm), monitoringMessage));
        }

        protected Collection<KeyValue<Bytes, Monitoring.MonitoringMessage>> consumerTopicData(Controlcenter.ClientGroup clientGroup, Controlcenter.TopicPartition topicPartition, Monitoring.MonitoringMessage monitoringMessage) {
            return ImmutableList.of(KeyValue.pair(this.expectedProductionInTopicPartitionForClientSerde.key(getKeyMm(clientGroup, topicPartition)), monitoringMessage));
        }

        private Monitoring.MonitoringMessage getKeyMm(Controlcenter.ClientGroup clientGroup, Controlcenter.TopicPartition topicPartition) {
            return Monitoring.MonitoringMessage.newBuilder().setClientType(Monitoring.ClientType.PRODUCER).setClusterId(topicPartition.getClusterId()).setGroup(clientGroup.getGroup()).setClientId(clientGroup.getClient()).setTopic(topicPartition.getTopic()).setPartition(topicPartition.getPartition()).build();
        }
    }

    /* loaded from: input_file:io/confluent/controlcenter/streams/consumergroups/ConsumerGroupStreamExtension$RekeyPredicate.class */
    public static class RekeyPredicate implements Predicate<Windowed<Controlcenter.TopicPartition>, Controlcenter.ClientGroupProductionAggregate> {
        public boolean test(Windowed<Controlcenter.TopicPartition> windowed, Controlcenter.ClientGroupProductionAggregate clientGroupProductionAggregate) {
            return (clientGroupProductionAggregate.hasNewClientGroup() && clientGroupProductionAggregate.getNewClientGroup().getWindow() > -1) || (clientGroupProductionAggregate.hasDiff() && clientGroupProductionAggregate.getDiff().getWindow() > -1);
        }
    }

    @Inject
    public ConsumerGroupStreamExtension(Verifiable verifiable, @TopicStoreModule.ConsumerGroupAggTpStore TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> store, @TopicStoreModule.ConsumerGroupGroupAggStore TopicStoreMaster.Store<String, Controlcenter.VerifiableMonitoringMessage, Long> store2, @TopicStoreModule.AggregatedTopicPartitionTableWindowsStore TopicStoreMaster.Store<Controlcenter.TopicPartition, Monitoring.MonitoringMessage, Controlcenter.ClientGroupProductionAggregate> store3, @TopicStoreModule.MonitoringMessageAggregatorWindowsStore TopicStoreMaster.Store<Bytes, Monitoring.MonitoringMessage, Monitoring.MonitoringMessage> store4, UberSerde<Controlcenter.TopicPartition> uberSerde, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier) {
        this.verifiable = verifiable;
        this.consumerGroupAggTpStore = store;
        this.consumerGroupGroupAggStore = store2;
        this.aggregatedTopicPartitionTableWindowsStore = store3;
        this.monitoringMessageAggregatorWindowsStore = store4;
        this.topicPartitionUberSerde = uberSerde;
        this.keySerdeSupplier = orderedKeyPrefixedSerdeSupplier;
    }

    public static Windows<TimeWindow> windows(Rollup rollup) {
        return TimeWindows.of(rollup.getMillis()).until(rollup.getRetainMillis());
    }

    public static String stageName(Rollup rollup, String str) {
        return TopicStoreMaster.nameJoin(str, rollup.name());
    }

    @Override // io.confluent.controlcenter.streams.StreamExtension
    public Map<Rollup, KTable<Windowed<Bytes>, Monitoring.MonitoringMessage>> extend(C3Stream<Void, Monitoring.MonitoringMessage> c3Stream) {
        C3Stream<Controlcenter.TopicPartition, Monitoring.MonitoringMessage> rekeyMessagesByTopicPartition = rekeyMessagesByTopicPartition(c3Stream);
        HashMap newHashMap = Maps.newHashMap();
        for (Rollup rollup : Rollup.values()) {
            newHashMap.put(rollup, aggregateAndStoreFinalData(rollup, extractDiffsAndRekeyByTopicPartitionClientGroup(rollup, clientGroupProductionAggregateStream(rollup, rekeyMessagesByTopicPartition))));
        }
        return newHashMap;
    }

    private C3Stream<Controlcenter.TopicPartition, Monitoring.MonitoringMessage> rekeyMessagesByTopicPartition(C3Stream<Void, Monitoring.MonitoringMessage> c3Stream) {
        return this.verifiable.transform(c3Stream.m103map((KeyValueMapper<? super Void, ? super Monitoring.MonitoringMessage, ? extends KeyValue<? extends K1, ? extends V1>>) new KeyValueMapper<Void, Monitoring.MonitoringMessage, KeyValue<Controlcenter.TopicPartition, Monitoring.MonitoringMessage>>() { // from class: io.confluent.controlcenter.streams.consumergroups.ConsumerGroupStreamExtension.1
            public KeyValue<Controlcenter.TopicPartition, Monitoring.MonitoringMessage> apply(Void r4, Monitoring.MonitoringMessage monitoringMessage) {
                return KeyValue.pair(ControlCenterProtoHelper.topicPartition(monitoringMessage), monitoringMessage);
            }
        }), this.topicPartitionUberSerde, this.consumerGroupAggTpStore);
    }

    private C3Stream<Windowed<Controlcenter.TopicPartition>, Controlcenter.ClientGroupProductionAggregate> clientGroupProductionAggregateStream(Rollup rollup, C3Stream<Controlcenter.TopicPartition, Monitoring.MonitoringMessage> c3Stream) {
        Windows<TimeWindow> windows = windows(rollup);
        return c3Stream.wrap(c3Stream.groupByKey(Serialized.with(this.aggregatedTopicPartitionTableWindowsStore.keySerde, this.aggregatedTopicPartitionTableWindowsStore.valueSerde)).windowedBy(windows).aggregate(new ConsumerGroupProductionInitializer(), new ConsumerGroupProductionAggregator(), Materialized.as(Stores.persistentWindowStore(stageName(rollup, this.aggregatedTopicPartitionTableWindowsStore.name), windows.maintainMs(), windows.segments, windows.size(), false)).withKeySerde(this.aggregatedTopicPartitionTableWindowsStore.keySerde).withValueSerde(this.aggregatedTopicPartitionTableWindowsStore.aggregateSerde).withCachingDisabled()).toStream());
    }

    private C3Stream<Bytes, Monitoring.MonitoringMessage> extractDiffsAndRekeyByTopicPartitionClientGroup(Rollup rollup, C3Stream<Windowed<Controlcenter.TopicPartition>, Controlcenter.ClientGroupProductionAggregate> c3Stream) {
        return this.verifiable.transform(c3Stream.m106filter((Predicate<? super Windowed<Controlcenter.TopicPartition>, ? super Controlcenter.ClientGroupProductionAggregate>) new RekeyPredicate()).m100flatMap((KeyValueMapper<? super Windowed<Controlcenter.TopicPartition>, ? super Controlcenter.ClientGroupProductionAggregate, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>>) new ExpectedConsumptionRekeyMapper(this.keySerdeSupplier)), this.keySerdeSupplier.get(), this.consumerGroupGroupAggStore, rollup);
    }

    private KTable<Windowed<Bytes>, Monitoring.MonitoringMessage> aggregateAndStoreFinalData(Rollup rollup, KStream<Bytes, Monitoring.MonitoringMessage> kStream) {
        Windows<TimeWindow> windows = windows(rollup);
        return kStream.groupByKey(Serialized.with(this.monitoringMessageAggregatorWindowsStore.keySerde, this.monitoringMessageAggregatorWindowsStore.valueSerde)).windowedBy(windows).aggregate(new MonitoringMessageInitializer(), MonitoringMessageAggregator.stripped(), Materialized.as(Stores.persistentWindowStore(stageName(rollup, this.monitoringMessageAggregatorWindowsStore.name), windows.maintainMs(), windows.segments, windows.size(), false)).withValueSerde(this.monitoringMessageAggregatorWindowsStore.aggregateSerde));
    }
}
