/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.streams.C3Stream;
import io.confluent.controlcenter.streams.StreamExtension;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.controlcenter.streams.TypeMapper;
import io.confluent.monitoring.record.Monitoring;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMonitoringMessageRollupStreamExtension<O>
implements StreamExtension<O, Void, Monitoring.MonitoringMessage> {
    private static final Logger log = LoggerFactory.getLogger(AbstractMonitoringMessageRollupStreamExtension.class);
    private static final Set<Monitoring.ClientType> DEFAULT_VALID_CLIENT_TYPES = ImmutableSet.of((Object)Monitoring.ClientType.CONSUMER);

    @Override
    public Map<Rollup, KTable<Windowed<Bytes>, O>> extend(C3Stream<Void, Monitoring.MonitoringMessage> stream) {
        KStream explodedStream = stream.filter((Predicate)new Predicate<Void, Monitoring.MonitoringMessage>(){

            public boolean test(Void key, Monitoring.MonitoringMessage value) {
                return AbstractMonitoringMessageRollupStreamExtension.this.validClientTypes().contains(value.getClientType());
            }
        }).flatMap(new KeyValueMapper<Void, Monitoring.MonitoringMessage, Iterable<KeyValue<Bytes, O>>>(){

            public Iterable<KeyValue<Bytes, O>> apply(Void key, Monitoring.MonitoringMessage value) {
                ArrayList out = Lists.newArrayList();
                for (TypeMapper mapper : AbstractMonitoringMessageRollupStreamExtension.this.getMappers()) {
                    out.add(mapper.apply(key, value));
                }
                return out;
            }
        });
        C3Stream<Bytes, O> groupedStream = this.shuffle((C3Stream<Bytes, O>)explodedStream);
        HashMap rollupAggregates = Maps.newHashMap();
        for (Rollup rollup : Rollup.values()) {
            TimeWindows windows = TimeWindows.of((long)rollup.getMillis()).until(rollup.getRetainMillis());
            rollupAggregates.put(rollup, this.filterAndAggregateRollup(groupedStream, (Windows<Window>)windows, TopicStoreMaster.nameJoin(this.name(), rollup.toString())));
        }
        return rollupAggregates;
    }

    protected abstract String name();

    protected Set<Monitoring.ClientType> validClientTypes() {
        return DEFAULT_VALID_CLIENT_TYPES;
    }

    protected abstract Iterable<? extends TypeMapper<Void, O>> getMappers();

    protected abstract KTable<Windowed<Bytes>, O> filterAndAggregateRollup(KStream<Bytes, O> var1, Windows<Window> var2, String var3);

    protected abstract C3Stream<Bytes, O> shuffle(C3Stream<Bytes, O> var1);
}

