package org.apache.kafka.streams.kstream.internals;

import java.lang.reflect.Array;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.Stores;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImpl.class */
public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
    private static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    public static final String MERGE_NAME = "KSTREAM-MERGE-";
    public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
    private static final String SELECT_NAME = "KSTREAM-SELECT-";
    public static final String SINK_NAME = "KSTREAM-SINK-";
    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
    private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";

    public KStreamImpl(KStreamBuilder kStreamBuilder, String str, Set<String> set) {
        super(kStreamBuilder, str, set);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> filter(Predicate<K, V> predicate) {
        String newName = this.topology.newName(FILTER_NAME);
        this.topology.addProcessor(newName, new KStreamFilter(predicate, false), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> filterOut(Predicate<K, V> predicate) {
        String newName = this.topology.newName(FILTER_NAME);
        this.topology.addProcessor(newName, new KStreamFilter(predicate, true), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> keyValueMapper) {
        String newName = this.topology.newName(MAP_NAME);
        this.topology.addProcessor(newName, new KStreamMap(keyValueMapper), this.name);
        return new KStreamImpl(this.topology, newName, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> valueMapper) {
        String newName = this.topology.newName(MAPVALUES_NAME);
        this.topology.addProcessor(newName, new KStreamMapValues(valueMapper), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> keyValueMapper) {
        String newName = this.topology.newName(FLATMAP_NAME);
        this.topology.addProcessor(newName, new KStreamFlatMap(keyValueMapper), this.name);
        return new KStreamImpl(this.topology, newName, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> valueMapper) {
        String newName = this.topology.newName(FLATMAPVALUES_NAME);
        this.topology.addProcessor(newName, new KStreamFlatMapValues(valueMapper), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V>[] branch(Predicate<K, V>... predicateArr) {
        String newName = this.topology.newName(BRANCH_NAME);
        this.topology.addProcessor(newName, new KStreamBranch((Predicate[]) predicateArr.clone()), this.name);
        KStream<K, V>[] kStreamArr = (KStream[]) Array.newInstance((Class<?>) KStream.class, predicateArr.length);
        for (int i = 0; i < predicateArr.length; i++) {
            String newName2 = this.topology.newName(BRANCHCHILD_NAME);
            this.topology.addProcessor(newName2, new KStreamPassThrough(), newName);
            kStreamArr[i] = new KStreamImpl(this.topology, newName2, this.sourceNodes);
        }
        return kStreamArr;
    }

    public static <K, V> KStream<K, V> merge(KStreamBuilder kStreamBuilder, KStream<K, V>[] kStreamArr) {
        String newName = kStreamBuilder.newName(MERGE_NAME);
        String[] strArr = new String[kStreamArr.length];
        HashSet hashSet = new HashSet();
        for (int i = 0; i < kStreamArr.length; i++) {
            KStreamImpl kStreamImpl = (KStreamImpl) kStreamArr[i];
            strArr[i] = kStreamImpl.name;
            if (hashSet != null) {
                if (kStreamImpl.sourceNodes != null) {
                    hashSet.addAll(kStreamImpl.sourceNodes);
                } else {
                    hashSet = null;
                }
            }
        }
        kStreamBuilder.addProcessor(newName, new KStreamPassThrough(), strArr);
        return new KStreamImpl(kStreamBuilder, newName, hashSet);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> through(String str, Serializer<K> serializer, Serializer<V> serializer2, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        to(str, serializer, serializer2);
        return this.topology.stream(deserializer, deserializer2, str);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> through(String str) {
        return through(str, null, null, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(String str) {
        to(str, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(String str, Serializer<K> serializer, Serializer<V> serializer2) {
        String newName = this.topology.newName(SINK_NAME);
        WindowedStreamPartitioner windowedStreamPartitioner = null;
        if (serializer != null && (serializer instanceof WindowedSerializer)) {
            windowedStreamPartitioner = new WindowedStreamPartitioner((WindowedSerializer) serializer);
        }
        this.topology.addSink(newName, str, serializer, serializer2, windowedStreamPartitioner, this.name);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... strArr) {
        String newName = this.topology.newName(TRANSFORM_NAME);
        this.topology.addProcessor(newName, new KStreamTransform(transformerSupplier), this.name);
        this.topology.connectProcessorAndStateStores(newName, strArr);
        return new KStreamImpl(this.topology, newName, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... strArr) {
        String newName = this.topology.newName(TRANSFORMVALUES_NAME);
        this.topology.addProcessor(newName, new KStreamTransformValues(valueTransformerSupplier), this.name);
        this.topology.connectProcessorAndStateStores(newName, strArr);
        return new KStreamImpl(this.topology, newName, this.sourceNodes);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void process(ProcessorSupplier<K, V> processorSupplier, String... strArr) {
        String newName = this.topology.newName(PROCESSOR_NAME);
        this.topology.addProcessor(newName, processorSupplier, this.name);
        this.topology.connectProcessorAndStateStores(newName, strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> join(KStream<K, V1> kStream, ValueJoiner<V, V1, R> valueJoiner, JoinWindows joinWindows, Serializer<K> serializer, Serializer<V> serializer2, Serializer<V1> serializer3, Deserializer<K> deserializer, Deserializer<V> deserializer2, Deserializer<V1> deserializer3) {
        return join(kStream, valueJoiner, joinWindows, serializer, serializer2, serializer3, deserializer, deserializer2, deserializer3, false);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> outerJoin(KStream<K, V1> kStream, ValueJoiner<V, V1, R> valueJoiner, JoinWindows joinWindows, Serializer<K> serializer, Serializer<V> serializer2, Serializer<V1> serializer3, Deserializer<K> deserializer, Deserializer<V> deserializer2, Deserializer<V1> deserializer3) {
        return join(kStream, valueJoiner, joinWindows, serializer, serializer2, serializer3, deserializer, deserializer2, deserializer3, true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <V1, R> KStream<K, R> join(KStream<K, V1> kStream, ValueJoiner<V, V1, R> valueJoiner, JoinWindows joinWindows, Serializer<K> serializer, Serializer<V> serializer2, Serializer<V1> serializer3, Deserializer<K> deserializer, Deserializer<V> deserializer2, Deserializer<V1> deserializer3, boolean z) {
        Set<String> ensureJoinableWith = ensureJoinableWith((AbstractStream) kStream);
        StateStoreSupplier build = Stores.create(joinWindows.name() + "-this").withKeys(serializer, deserializer).withValues(serializer2, deserializer2).persistent().windowed(joinWindows.maintainMs(), joinWindows.segments, true).build();
        StateStoreSupplier build2 = Stores.create(joinWindows.name() + "-other").withKeys(serializer, deserializer).withValues(serializer3, deserializer3).persistent().windowed(joinWindows.maintainMs(), joinWindows.segments, true).build();
        KStreamJoinWindow kStreamJoinWindow = new KStreamJoinWindow(build.name(), joinWindows.before + joinWindows.after + 1, joinWindows.maintainMs());
        KStreamJoinWindow kStreamJoinWindow2 = new KStreamJoinWindow(build2.name(), joinWindows.before + joinWindows.after + 1, joinWindows.maintainMs());
        KStreamKStreamJoin kStreamKStreamJoin = new KStreamKStreamJoin(build2.name(), joinWindows.before, joinWindows.after, valueJoiner, z);
        KStreamKStreamJoin kStreamKStreamJoin2 = new KStreamKStreamJoin(build.name(), joinWindows.before, joinWindows.after, reverseJoiner(valueJoiner), z);
        KStreamPassThrough kStreamPassThrough = new KStreamPassThrough();
        String newName = this.topology.newName(WINDOWED_NAME);
        String newName2 = this.topology.newName(WINDOWED_NAME);
        String newName3 = z ? this.topology.newName(OUTERTHIS_NAME) : this.topology.newName(JOINTHIS_NAME);
        String newName4 = z ? this.topology.newName(OUTEROTHER_NAME) : this.topology.newName(JOINOTHER_NAME);
        String newName5 = this.topology.newName(MERGE_NAME);
        this.topology.addProcessor(newName, kStreamJoinWindow, this.name);
        this.topology.addProcessor(newName2, kStreamJoinWindow2, ((KStreamImpl) kStream).name);
        this.topology.addProcessor(newName3, kStreamKStreamJoin, newName);
        this.topology.addProcessor(newName4, kStreamKStreamJoin2, newName2);
        this.topology.addProcessor(newName5, kStreamPassThrough, newName3, newName4);
        this.topology.addStateStore(build, newName, newName2);
        this.topology.addStateStore(build2, newName, newName2);
        return new KStreamImpl(this.topology, newName5, ensureJoinableWith);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> leftJoin(KStream<K, V1> kStream, ValueJoiner<V, V1, R> valueJoiner, JoinWindows joinWindows, Serializer<K> serializer, Serializer<V1> serializer2, Deserializer<K> deserializer, Deserializer<V1> deserializer2) {
        Set<String> ensureJoinableWith = ensureJoinableWith((AbstractStream) kStream);
        StateStoreSupplier build = Stores.create(joinWindows.name() + "-other").withKeys(serializer, deserializer).withValues(serializer2, deserializer2).persistent().windowed(joinWindows.maintainMs(), joinWindows.segments, true).build();
        KStreamJoinWindow kStreamJoinWindow = new KStreamJoinWindow(build.name(), joinWindows.before + joinWindows.after + 1, joinWindows.maintainMs());
        KStreamKStreamJoin kStreamKStreamJoin = new KStreamKStreamJoin(build.name(), joinWindows.before, joinWindows.after, valueJoiner, true);
        String newName = this.topology.newName(WINDOWED_NAME);
        String newName2 = this.topology.newName(LEFTJOIN_NAME);
        this.topology.addProcessor(newName, kStreamJoinWindow, ((KStreamImpl) kStream).name);
        this.topology.addProcessor(newName2, kStreamKStreamJoin, this.name);
        this.topology.addStateStore(build, newName2, newName);
        return new KStreamImpl(this.topology, newName2, ensureJoinableWith);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> kTable, ValueJoiner<V, V1, R> valueJoiner) {
        Set<String> ensureJoinableWith = ensureJoinableWith((AbstractStream) kTable);
        String newName = this.topology.newName(LEFTJOIN_NAME);
        this.topology.addProcessor(newName, new KStreamKTableLeftJoin((KTableImpl) kTable, valueJoiner), this.name);
        this.topology.connectProcessors(this.name, ((KTableImpl) kTable).name);
        return new KStreamImpl(this.topology, newName, ensureJoinableWith);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows, Serializer<K> serializer, Serializer<V> serializer2, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        String newName = this.topology.newName(REDUCE_NAME);
        KStreamWindowReduce kStreamWindowReduce = new KStreamWindowReduce(windows, windows.name(), reducer);
        StateStoreSupplier build = Stores.create(windows.name()).withKeys(serializer, deserializer).withValues(serializer2, deserializer2).persistent().windowed(windows.maintainMs(), windows.segments, false).build();
        this.topology.addProcessor(newName, kStreamWindowReduce, this.name);
        this.topology.addStateStore(build, newName);
        return new KTableImpl(this.topology, newName, kStreamWindowReduce, this.sourceNodes);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KTable<K, V> reduceByKey(Reducer<V> reducer, Serializer<K> serializer, Serializer<V> serializer2, Deserializer<K> deserializer, Deserializer<V> deserializer2, String str) {
        String newName = this.topology.newName(REDUCE_NAME);
        KStreamReduce kStreamReduce = new KStreamReduce(str, reducer);
        StateStoreSupplier build = Stores.create(str).withKeys(serializer, deserializer).withValues(serializer2, deserializer2).persistent().build();
        this.topology.addProcessor(newName, kStreamReduce, this.name);
        this.topology.addStateStore(build, newName);
        return new KTableImpl(this.topology, newName, kStreamReduce, this.sourceNodes);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Windows<W> windows, Serializer<K> serializer, Serializer<T> serializer2, Deserializer<K> deserializer, Deserializer<T> deserializer2) {
        String newName = this.topology.newName(AGGREGATE_NAME);
        KStreamWindowAggregate kStreamWindowAggregate = new KStreamWindowAggregate(windows, windows.name(), initializer, aggregator);
        StateStoreSupplier build = Stores.create(windows.name()).withKeys(serializer, deserializer).withValues(serializer2, deserializer2).persistent().windowed(windows.maintainMs(), windows.segments, false).build();
        this.topology.addProcessor(newName, kStreamWindowAggregate, this.name);
        this.topology.addStateStore(build, newName);
        return new KTableImpl(this.topology, newName, kStreamWindowAggregate, this.sourceNodes);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Serializer<K> serializer, Serializer<T> serializer2, Deserializer<K> deserializer, Deserializer<T> deserializer2, String str) {
        String newName = this.topology.newName(AGGREGATE_NAME);
        KStreamAggregate kStreamAggregate = new KStreamAggregate(str, initializer, aggregator);
        StateStoreSupplier build = Stores.create(str).withKeys(serializer, deserializer).withValues(serializer2, deserializer2).persistent().build();
        this.topology.addProcessor(newName, kStreamAggregate, this.name);
        this.topology.addStateStore(build, newName);
        return new KTableImpl(this.topology, newName, kStreamAggregate, this.sourceNodes);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KStream
    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serializer<K> serializer, Serializer<Long> serializer2, Deserializer<K> deserializer, Deserializer<Long> deserializer2) {
        return (KTable<Windowed<K>, Long>) aggregateByKey(new Initializer<Long>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImpl.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.kafka.streams.kstream.Initializer
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<K, V, Long>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImpl.2
            /* renamed from: apply, reason: avoid collision after fix types in other method */
            public Long apply2(K k, V v, Long l) {
                return Long.valueOf(l.longValue() + 1);
            }

            @Override // org.apache.kafka.streams.kstream.Aggregator
            public /* bridge */ /* synthetic */ Long apply(Object obj, Object obj2, Long l) {
                return apply2((AnonymousClass2) obj, obj2, l);
            }
        }, windows, serializer, serializer2, deserializer, deserializer2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KStream
    public KTable<K, Long> countByKey(Serializer<K> serializer, Serializer<Long> serializer2, Deserializer<K> deserializer, Deserializer<Long> deserializer2, String str) {
        return (KTable<K, Long>) aggregateByKey(new Initializer<Long>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImpl.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.kafka.streams.kstream.Initializer
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<K, V, Long>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImpl.4
            /* renamed from: apply, reason: avoid collision after fix types in other method */
            public Long apply2(K k, V v, Long l) {
                return Long.valueOf(l.longValue() + 1);
            }

            @Override // org.apache.kafka.streams.kstream.Aggregator
            public /* bridge */ /* synthetic */ Long apply(Object obj, Object obj2, Long l) {
                return apply2((AnonymousClass4) obj, obj2, l);
            }
        }, serializer, serializer2, deserializer, deserializer2, str);
    }
}
