package io.confluent.controlcenter.streams;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.confluent.controlcenter.Rollup;
import io.confluent.monitoring.record.Monitoring;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/streams/AbstractMonitoringMessageRollupStreamExtension.class */
public abstract class AbstractMonitoringMessageRollupStreamExtension<O> implements StreamExtension<O, Void, Monitoring.MonitoringMessage> {
    private static final Logger log = LoggerFactory.getLogger(AbstractMonitoringMessageRollupStreamExtension.class);
    private static final Set<Monitoring.ClientType> DEFAULT_VALID_CLIENT_TYPES = ImmutableSet.of(Monitoring.ClientType.CONSUMER);

    @Override // io.confluent.controlcenter.streams.StreamExtension
    public Map<Rollup, KTable<Windowed<Bytes>, O>> extend(C3Stream<Void, Monitoring.MonitoringMessage> c3Stream) {
        C3Stream<Bytes, O> shuffle = shuffle(c3Stream.m142filter((Predicate<? super Void, ? super Monitoring.MonitoringMessage>) new Predicate<Void, Monitoring.MonitoringMessage>() { // from class: io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension.2
            public boolean test(Void r4, Monitoring.MonitoringMessage monitoringMessage) {
                return AbstractMonitoringMessageRollupStreamExtension.this.validClientTypes().contains(monitoringMessage.getClientType());
            }
        }).m130flatMap((KeyValueMapper<? super Void, ? super Monitoring.MonitoringMessage, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>>) new KeyValueMapper<Void, Monitoring.MonitoringMessage, Iterable<KeyValue<Bytes, O>>>() { // from class: io.confluent.controlcenter.streams.AbstractMonitoringMessageRollupStreamExtension.1
            public Iterable<KeyValue<Bytes, O>> apply(Void r6, Monitoring.MonitoringMessage monitoringMessage) {
                ArrayList newArrayList = Lists.newArrayList();
                Iterator<? extends TypeMapper<Void, O>> it = AbstractMonitoringMessageRollupStreamExtension.this.getMappers().iterator();
                while (it.hasNext()) {
                    newArrayList.add(it.next().apply(r6, monitoringMessage));
                }
                return newArrayList;
            }
        }));
        HashMap newHashMap = Maps.newHashMap();
        for (Rollup rollup : Rollup.values()) {
            newHashMap.put(rollup, filterAndAggregateRollup(shuffle, TimeWindows.of(rollup.getMillis()).until(rollup.getRetainMillis()), TopicStoreMaster.nameJoin(name(), rollup.toString())));
        }
        return newHashMap;
    }

    protected abstract String name();

    protected Set<Monitoring.ClientType> validClientTypes() {
        return DEFAULT_VALID_CLIENT_TYPES;
    }

    protected abstract Iterable<? extends TypeMapper<Void, O>> getMappers();

    protected abstract KTable<Windowed<Bytes>, O> filterAndAggregateRollup(KStream<Bytes, O> kStream, Windows<Window> windows, String str);

    protected abstract C3Stream<Bytes, O> shuffle(C3Stream<Bytes, O> c3Stream);
}
