package org.apache.kafka.streams.kstream.internals;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Array;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImpl.class */
public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
    public static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    public static final String JOIN_NAME = "KSTREAM-JOIN-";
    public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    public static final String MERGE_NAME = "KSTREAM-MERGE-";
    public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
    public static final String SINK_NAME = "KSTREAM-SINK-";
    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
    private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
    public static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
    private final boolean repartitionRequired;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImpl$KStreamImplJoin.class */
    public class KStreamImplJoin {
        private final boolean leftOuter;
        private final boolean rightOuter;

        KStreamImplJoin(boolean z, boolean z2) {
            this.leftOuter = z;
            this.rightOuter = z2;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> kStream, KStream<K1, V2> kStream2, ValueJoiner<? super V1, ? super V2, ? extends R> valueJoiner, JoinWindows joinWindows, Serde<K1> serde, Serde<V1> serde2, Serde<V2> serde3) {
            String newName = KStreamImpl.this.topology.newName(KStreamImpl.WINDOWED_NAME);
            String newName2 = KStreamImpl.this.topology.newName(KStreamImpl.WINDOWED_NAME);
            String newName3 = this.rightOuter ? KStreamImpl.this.topology.newName(KStreamImpl.OUTERTHIS_NAME) : KStreamImpl.this.topology.newName(KStreamImpl.JOINTHIS_NAME);
            String newName4 = this.leftOuter ? KStreamImpl.this.topology.newName(KStreamImpl.OUTEROTHER_NAME) : KStreamImpl.this.topology.newName(KStreamImpl.JOINOTHER_NAME);
            String newName5 = KStreamImpl.this.topology.newName(KStreamImpl.MERGE_NAME);
            StateStoreSupplier createWindowedStateStore = KStreamImpl.createWindowedStateStore(joinWindows, serde, serde2, newName3 + "-store");
            StateStoreSupplier createWindowedStateStore2 = KStreamImpl.createWindowedStateStore(joinWindows, serde, serde3, newName4 + "-store");
            KStreamJoinWindow kStreamJoinWindow = new KStreamJoinWindow(createWindowedStateStore.name(), joinWindows.beforeMs + joinWindows.afterMs + 1, joinWindows.maintainMs());
            KStreamJoinWindow kStreamJoinWindow2 = new KStreamJoinWindow(createWindowedStateStore2.name(), joinWindows.beforeMs + joinWindows.afterMs + 1, joinWindows.maintainMs());
            KStreamKStreamJoin kStreamKStreamJoin = new KStreamKStreamJoin(createWindowedStateStore2.name(), joinWindows.beforeMs, joinWindows.afterMs, valueJoiner, this.leftOuter);
            KStreamKStreamJoin kStreamKStreamJoin2 = new KStreamKStreamJoin(createWindowedStateStore.name(), joinWindows.afterMs, joinWindows.beforeMs, AbstractStream.reverseJoiner(valueJoiner), this.rightOuter);
            KStreamPassThrough kStreamPassThrough = new KStreamPassThrough();
            KStreamImpl.this.topology.addProcessor(newName, kStreamJoinWindow, ((AbstractStream) kStream).name);
            KStreamImpl.this.topology.addProcessor(newName2, kStreamJoinWindow2, ((AbstractStream) kStream2).name);
            KStreamImpl.this.topology.addProcessor(newName3, kStreamKStreamJoin, newName);
            KStreamImpl.this.topology.addProcessor(newName4, kStreamKStreamJoin2, newName2);
            KStreamImpl.this.topology.addProcessor(newName5, kStreamPassThrough, newName3, newName4);
            KStreamImpl.this.topology.addStateStore(createWindowedStateStore, newName, newName4);
            KStreamImpl.this.topology.addStateStore(createWindowedStateStore2, newName2, newName3);
            HashSet hashSet = new HashSet(((AbstractStream) kStream).sourceNodes);
            hashSet.addAll(((KStreamImpl) kStream2).sourceNodes);
            return new KStreamImpl(KStreamImpl.this.topology, newName5, hashSet, false);
        }
    }

    public KStreamImpl(KStreamBuilder kStreamBuilder, String str, Set<String> set, boolean z) {
        super(kStreamBuilder, str, set);
        this.repartitionRequired = z;
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String newName = this.topology.newName(FILTER_NAME);
        this.topology.addProcessor(newName, new KStreamFilter(predicate, false), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, this.repartitionRequired);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String newName = this.topology.newName(FILTER_NAME);
        this.topology.addProcessor(newName, new KStreamFilter(predicate, true), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, this.repartitionRequired);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1> KStream<K1, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends K1> keyValueMapper) {
        Objects.requireNonNull(keyValueMapper, "mapper can't be null");
        return new KStreamImpl(this.topology, internalSelectKey(keyValueMapper), this.sourceNodes, true);
    }

    private <K1> String internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> keyValueMapper) {
        String newName = this.topology.newName(KEY_SELECT_NAME);
        this.topology.addProcessor(newName, new KStreamMap(new KeyValueMapper<K, V, KeyValue<K1, V>>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImpl.1
            @Override // org.apache.kafka.streams.kstream.KeyValueMapper
            public KeyValue<K1, V> apply(K k, V v) {
                return new KeyValue<>(keyValueMapper.apply(k, v), v);
            }

            @Override // org.apache.kafka.streams.kstream.KeyValueMapper
            public /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
                return apply((AnonymousClass1<K1>) obj, obj2);
            }
        }), this.name);
        return newName;
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> keyValueMapper) {
        Objects.requireNonNull(keyValueMapper, "mapper can't be null");
        String newName = this.topology.newName(MAP_NAME);
        this.topology.addProcessor(newName, new KStreamMap(keyValueMapper), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, true);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1> KStream<K, V1> mapValues(ValueMapper<? super V, ? extends V1> valueMapper) {
        Objects.requireNonNull(valueMapper, "mapper can't be null");
        String newName = this.topology.newName(MAPVALUES_NAME);
        this.topology.addProcessor(newName, new KStreamMapValues(valueMapper), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, this.repartitionRequired);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void print() {
        print(null, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void print(String str) {
        print(null, null, str);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void print(Serde<K> serde, Serde<V> serde2) {
        print(serde, serde2, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void print(Serde<K> serde, Serde<V> serde2, String str) {
        this.topology.addProcessor(this.topology.newName(PRINTING_NAME), new KeyValuePrinter(serde, serde2, str == null ? this.name : str), this.name);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void writeAsText(String str) {
        writeAsText(str, null, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void writeAsText(String str, String str2) {
        writeAsText(str, str2, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void writeAsText(String str, Serde<K> serde, Serde<V> serde2) {
        writeAsText(str, null, serde, serde2);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void writeAsText(String str, String str2, Serde<K> serde, Serde<V> serde2) {
        Objects.requireNonNull(str, "filePath can't be null");
        if (str.trim().isEmpty()) {
            throw new TopologyBuilderException("filePath can't be an empty string");
        }
        try {
            this.topology.addProcessor(this.topology.newName(PRINTING_NAME), new KeyValuePrinter(new PrintStream(new FileOutputStream(str)), serde, serde2, str2 == null ? this.name : str2), this.name);
        } catch (FileNotFoundException e) {
            throw new TopologyBuilderException("Unable to write stream to file at [" + str + "] " + e.getMessage());
        }
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> keyValueMapper) {
        Objects.requireNonNull(keyValueMapper, "mapper can't be null");
        String newName = this.topology.newName(FLATMAP_NAME);
        this.topology.addProcessor(newName, new KStreamFlatMap(keyValueMapper), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, true);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1> KStream<K, V1> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> valueMapper) {
        Objects.requireNonNull(valueMapper, "mapper can't be null");
        String newName = this.topology.newName(FLATMAPVALUES_NAME);
        this.topology.addProcessor(newName, new KStreamFlatMapValues(valueMapper), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, this.repartitionRequired);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V>[] branch(Predicate<? super K, ? super V>... predicateArr) {
        if (predicateArr.length == 0) {
            throw new IllegalArgumentException("you must provide at least one predicate");
        }
        for (Predicate<? super K, ? super V> predicate : predicateArr) {
            Objects.requireNonNull(predicate, "predicates can't have null values");
        }
        String newName = this.topology.newName(BRANCH_NAME);
        this.topology.addProcessor(newName, new KStreamBranch((Predicate[]) predicateArr.clone()), this.name);
        KStream<K, V>[] kStreamArr = (KStream[]) Array.newInstance((Class<?>) KStream.class, predicateArr.length);
        for (int i = 0; i < predicateArr.length; i++) {
            String newName2 = this.topology.newName(BRANCHCHILD_NAME);
            this.topology.addProcessor(newName2, new KStreamPassThrough(), newName);
            kStreamArr[i] = new KStreamImpl(this.topology, newName2, this.sourceNodes, this.repartitionRequired);
        }
        return kStreamArr;
    }

    public static <K, V> KStream<K, V> merge(KStreamBuilder kStreamBuilder, KStream<K, V>[] kStreamArr) {
        if (kStreamArr == null || kStreamArr.length == 0) {
            throw new IllegalArgumentException("Parameter <streams> must not be null or has length zero");
        }
        String newName = kStreamBuilder.newName(MERGE_NAME);
        String[] strArr = new String[kStreamArr.length];
        HashSet hashSet = new HashSet();
        boolean z = false;
        for (int i = 0; i < kStreamArr.length; i++) {
            KStreamImpl kStreamImpl = (KStreamImpl) kStreamArr[i];
            strArr[i] = kStreamImpl.name;
            z |= kStreamImpl.repartitionRequired;
            hashSet.addAll(kStreamImpl.sourceNodes);
        }
        kStreamBuilder.addProcessor(newName, new KStreamPassThrough(), strArr);
        return new KStreamImpl(kStreamBuilder, newName, hashSet, z);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> through(Serde<K> serde, Serde<V> serde2, StreamPartitioner<? super K, ? super V> streamPartitioner, String str) {
        to(serde, serde2, streamPartitioner, str);
        return this.topology.stream(serde, serde2, str);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void foreach(ForeachAction<? super K, ? super V> foreachAction) {
        Objects.requireNonNull(foreachAction, "action can't be null");
        this.topology.addProcessor(this.topology.newName(FOREACH_NAME), new KStreamForeach(foreachAction), this.name);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> through(Serde<K> serde, Serde<V> serde2, String str) {
        return through(serde, serde2, null, str);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> through(StreamPartitioner<? super K, ? super V> streamPartitioner, String str) {
        return through(null, null, streamPartitioner, str);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> through(String str) {
        return through(null, null, null, str);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(String str) {
        to(null, null, null, str);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(StreamPartitioner<? super K, ? super V> streamPartitioner, String str) {
        to(null, null, streamPartitioner, str);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(Serde<K> serde, Serde<V> serde2, String str) {
        to(serde, serde2, null, str);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(Serde<K> serde, Serde<V> serde2, StreamPartitioner<? super K, ? super V> streamPartitioner, String str) {
        Objects.requireNonNull(str, "topic can't be null");
        String newName = this.topology.newName(SINK_NAME);
        Serializer<K> serializer = serde == null ? null : serde.serializer();
        Serializer<V> serializer2 = serde2 == null ? null : serde2.serializer();
        if (streamPartitioner == null && serializer != null && (serializer instanceof WindowedSerializer)) {
            streamPartitioner = new WindowedStreamPartitioner((WindowedSerializer) serializer);
        }
        this.topology.addSink(newName, str, serializer, serializer2, streamPartitioner, this.name);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, String... strArr) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        String newName = this.topology.newName(TRANSFORM_NAME);
        this.topology.addProcessor(newName, new KStreamTransform(transformerSupplier), this.name);
        this.topology.connectProcessorAndStateStores(newName, strArr);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, true);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier, String... strArr) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
        String newName = this.topology.newName(TRANSFORMVALUES_NAME);
        this.topology.addProcessor(newName, new KStreamTransformValues(valueTransformerSupplier), this.name);
        this.topology.connectProcessorAndStateStores(newName, strArr);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, this.repartitionRequired);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void process(ProcessorSupplier<? super K, ? super V> processorSupplier, String... strArr) {
        String newName = this.topology.newName(PROCESSOR_NAME);
        this.topology.addProcessor(newName, processorSupplier, this.name);
        this.topology.connectProcessorAndStateStores(newName, strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> join(KStream<K, V1> kStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, JoinWindows joinWindows, Serde<K> serde, Serde<V> serde2, Serde<V1> serde3) {
        return doJoin(kStream, valueJoiner, joinWindows, serde, serde2, serde3, new KStreamImplJoin(false, false));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> join(KStream<K, V1> kStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, JoinWindows joinWindows) {
        return join(kStream, valueJoiner, joinWindows, null, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> outerJoin(KStream<K, V1> kStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, JoinWindows joinWindows, Serde<K> serde, Serde<V> serde2, Serde<V1> serde3) {
        return doJoin(kStream, valueJoiner, joinWindows, serde, serde2, serde3, new KStreamImplJoin(true, true));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> outerJoin(KStream<K, V1> kStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, JoinWindows joinWindows) {
        return outerJoin(kStream, valueJoiner, joinWindows, null, null, null);
    }

    private <V1, R> KStream<K, R> doJoin(KStream<K, V1> kStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, JoinWindows joinWindows, Serde<K> serde, Serde<V> serde2, Serde<V1> serde3, KStreamImpl<K, V>.KStreamImplJoin kStreamImplJoin) {
        Objects.requireNonNull(kStream, "other KStream can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Objects.requireNonNull(joinWindows, "windows can't be null");
        KStreamImpl<K, V> kStreamImpl = this;
        KStreamImpl kStreamImpl2 = (KStreamImpl) kStream;
        if (kStreamImpl.repartitionRequired) {
            kStreamImpl = kStreamImpl.repartitionForJoin(serde, serde2, null);
        }
        if (kStreamImpl2.repartitionRequired) {
            kStreamImpl2 = kStreamImpl2.repartitionForJoin(serde, serde3, null);
        }
        kStreamImpl.ensureJoinableWith(kStreamImpl2);
        return (KStream<K, R>) kStreamImplJoin.join(kStreamImpl, kStreamImpl2, valueJoiner, joinWindows, serde, serde2, serde3);
    }

    private KStreamImpl<K, V> repartitionForJoin(Serde<K> serde, Serde<V> serde2, String str) {
        String createReparitionedSource = createReparitionedSource(this, serde, serde2, str);
        return new KStreamImpl<>(this.topology, createReparitionedSource, Collections.singleton(createReparitionedSource), false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K1, V1> String createReparitionedSource(AbstractStream<K1> abstractStream, Serde<K1> serde, Serde<V1> serde2, String str) {
        Serializer serializer = serde != null ? serde.serializer() : null;
        Serializer serializer2 = serde2 != null ? serde2.serializer() : null;
        Deserializer deserializer = serde != null ? serde.deserializer() : null;
        Deserializer deserializer2 = serde2 != null ? serde2.deserializer() : null;
        String str2 = (str != null ? str : abstractStream.name) + REPARTITION_TOPIC_SUFFIX;
        String newName = abstractStream.topology.newName(SINK_NAME);
        String newName2 = abstractStream.topology.newName(FILTER_NAME);
        String newName3 = abstractStream.topology.newName(SOURCE_NAME);
        abstractStream.topology.addInternalTopic(str2);
        abstractStream.topology.addProcessor(newName2, new KStreamFilter(new Predicate<K1, V1>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamImpl.2
            @Override // org.apache.kafka.streams.kstream.Predicate
            public boolean test(K1 k1, V1 v1) {
                return k1 != null;
            }
        }, false), abstractStream.name);
        abstractStream.topology.addSink(newName, str2, serializer, serializer2, newName2);
        abstractStream.topology.addSource(newName3, deserializer, deserializer2, str2);
        return newName3;
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> leftJoin(KStream<K, V1> kStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, JoinWindows joinWindows, Serde<K> serde, Serde<V> serde2, Serde<V1> serde3) {
        return doJoin(kStream, valueJoiner, joinWindows, serde, serde2, serde3, new KStreamImplJoin(true, false));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> leftJoin(KStream<K, V1> kStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, JoinWindows joinWindows) {
        return leftJoin(kStream, valueJoiner, joinWindows, null, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> join(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return join(kTable, valueJoiner, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> join(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, Serde<K> serde, Serde<V> serde2) {
        return this.repartitionRequired ? repartitionForJoin(serde, serde2, null).doStreamTableJoin(kTable, valueJoiner, false) : doStreamTableJoin(kTable, valueJoiner, false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1, R> KStream<K, R> leftJoin(GlobalKTable<K1, V1> globalKTable, KeyValueMapper<? super K, ? super V, ? extends K1> keyValueMapper, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return (KStream<K, R>) globalTableJoin(globalKTable, keyValueMapper, valueJoiner, true);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1, V2> KStream<K, V2> join(GlobalKTable<K1, V1> globalKTable, KeyValueMapper<? super K, ? super V, ? extends K1> keyValueMapper, ValueJoiner<? super V, ? super V1, ? extends V2> valueJoiner) {
        return globalTableJoin(globalKTable, keyValueMapper, valueJoiner, false);
    }

    private <K1, V1, V2> KStream<K, V2> globalTableJoin(GlobalKTable<K1, V1> globalKTable, KeyValueMapper<? super K, ? super V, ? extends K1> keyValueMapper, ValueJoiner<? super V, ? super V1, ? extends V2> valueJoiner, boolean z) {
        Objects.requireNonNull(globalKTable, "globalTable can't be null");
        Objects.requireNonNull(keyValueMapper, "keyMapper can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        KTableValueGetterSupplier<K, V> valueGetterSupplier = ((GlobalKTableImpl) globalKTable).valueGetterSupplier();
        String newName = this.topology.newName(LEFTJOIN_NAME);
        this.topology.addProcessor(newName, new KStreamGlobalKTableJoin(valueGetterSupplier, valueJoiner, keyValueMapper, z), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <V1, R> KStream<K, R> doStreamTableJoin(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, boolean z) {
        Objects.requireNonNull(kTable, "other KTable can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Set<String> ensureJoinableWith = ensureJoinableWith((AbstractStream) kTable);
        String newName = this.topology.newName(z ? LEFTJOIN_NAME : JOIN_NAME);
        this.topology.addProcessor(newName, new KStreamKTableJoin(((KTableImpl) kTable).valueGetterSupplier(), valueJoiner, z), this.name);
        this.topology.connectProcessorAndStateStores(newName, kTable.getStoreName());
        this.topology.connectProcessors(this.name, ((KTableImpl) kTable).name);
        return new KStreamImpl(this.topology, newName, ensureJoinableWith, false);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return leftJoin(kTable, valueJoiner, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, Serde<K> serde, Serde<V> serde2) {
        return this.repartitionRequired ? repartitionForJoin(serde, serde2, null).doStreamTableJoin(kTable, valueJoiner, true) : doStreamTableJoin(kTable, valueJoiner, true);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> keyValueMapper) {
        return groupBy(keyValueMapper, null, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> keyValueMapper, Serde<K1> serde, Serde<V> serde2) {
        Objects.requireNonNull(keyValueMapper, "selector can't be null");
        return new KGroupedStreamImpl(this.topology, internalSelectKey(keyValueMapper), this.sourceNodes, serde, serde2, true);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KGroupedStream<K, V> groupByKey() {
        return groupByKey(null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KGroupedStream<K, V> groupByKey(Serde<K> serde, Serde<V> serde2) {
        return new KGroupedStreamImpl(this.topology, this.name, this.sourceNodes, serde, serde2, this.repartitionRequired);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V> StateStoreSupplier createWindowedStateStore(JoinWindows joinWindows, Serde<K> serde, Serde<V> serde2, String str) {
        return Stores.create(str).withKeys(serde).withValues(serde2).persistent().windowed(joinWindows.size(), joinWindows.maintainMs(), joinWindows.segments, true).build();
    }
}
