package org.apache.kafka.streams.kstream.internals;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.BranchedKStream;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode;
import org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImpl.class */
public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K, V> {
    static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    static final String JOIN_NAME = "KSTREAM-JOIN-";
    static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    static final String MERGE_NAME = "KSTREAM-MERGE-";
    static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
    static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    static final String SINK_NAME = "KSTREAM-SINK-";
    static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
    private static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String PEEK_NAME = "KSTREAM-PEEK-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
    private static final String TO_KTABLE_NAME = "KSTREAM-TOTABLE-";
    private static final String REPARTITION_NAME = "KSTREAM-REPARTITION-";
    private final boolean repartitionRequired;
    private OptimizableRepartitionNode<K, V> repartitionNode;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KStreamImpl(String str, Serde<K> serde, Serde<V> serde2, Set<String> set, boolean z, GraphNode graphNode, InternalStreamsBuilder internalStreamsBuilder) {
        super(str, serde, serde2, set, graphNode, internalStreamsBuilder);
        this.repartitionRequired = z;
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
        return filter(predicate, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamFilter(predicate, false), orElseGenerateWithPrefix));
        this.builder.addGraphNode(this.graphNode, processorGraphNode);
        return new KStreamImpl(orElseGenerateWithPrefix, this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, processorGraphNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        return filterNot(predicate, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamFilter(predicate, true), orElseGenerateWithPrefix));
        this.builder.addGraphNode(this.graphNode, processorGraphNode);
        return new KStreamImpl(orElseGenerateWithPrefix, this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, processorGraphNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> keyValueMapper) {
        return selectKey(keyValueMapper, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> keyValueMapper, Named named) {
        Objects.requireNonNull(keyValueMapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        ProcessorGraphNode<K, V> internalSelectKey = internalSelectKey(keyValueMapper, new NamedInternal(named));
        internalSelectKey.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, internalSelectKey);
        return new KStreamImpl(internalSelectKey.nodeName(), null, this.valueSerde, this.subTopologySourceNodes, true, internalSelectKey, this.builder);
    }

    private <KR> ProcessorGraphNode<K, V> internalSelectKey(KeyValueMapper<? super K, ? super V, ? extends KR> keyValueMapper, NamedInternal namedInternal) {
        String orElseGenerateWithPrefix = namedInternal.orElseGenerateWithPrefix(this.builder, KEY_SELECT_NAME);
        return new ProcessorGraphNode<>(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamMap((obj, obj2) -> {
            return new KeyValue(keyValueMapper.apply(obj, obj2), obj2);
        }), orElseGenerateWithPrefix));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> keyValueMapper) {
        return map(keyValueMapper, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> keyValueMapper, Named named) {
        Objects.requireNonNull(keyValueMapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAP_NAME);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamMap(keyValueMapper), orElseGenerateWithPrefix));
        processorGraphNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, processorGraphNode);
        return new KStreamImpl(orElseGenerateWithPrefix, null, null, this.subTopologySourceNodes, true, processorGraphNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> valueMapper) {
        return mapValues(withKey(valueMapper));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> valueMapper, Named named) {
        return mapValues(withKey(valueMapper), named);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey) {
        return mapValues(valueMapperWithKey, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey, Named named) {
        Objects.requireNonNull(valueMapperWithKey, "valueMapperWithKey can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAPVALUES_NAME);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamMapValues(valueMapperWithKey), orElseGenerateWithPrefix));
        processorGraphNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, processorGraphNode);
        return new KStreamImpl(orElseGenerateWithPrefix, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, processorGraphNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR, VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> keyValueMapper) {
        return flatMap(keyValueMapper, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR, VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> keyValueMapper, Named named) {
        Objects.requireNonNull(keyValueMapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FLATMAP_NAME);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamFlatMap(keyValueMapper), orElseGenerateWithPrefix));
        processorGraphNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, processorGraphNode);
        return new KStreamImpl(orElseGenerateWithPrefix, null, null, this.subTopologySourceNodes, true, processorGraphNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> valueMapper) {
        return flatMapValues(withKey(valueMapper));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> valueMapper, Named named) {
        return flatMapValues(withKey(valueMapper), named);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> valueMapperWithKey) {
        return flatMapValues(valueMapperWithKey, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> valueMapperWithKey, Named named) {
        Objects.requireNonNull(valueMapperWithKey, "valueMapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FLATMAPVALUES_NAME);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamFlatMapValues(valueMapperWithKey), orElseGenerateWithPrefix));
        processorGraphNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, processorGraphNode);
        return new KStreamImpl(orElseGenerateWithPrefix, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, processorGraphNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void print(Printed<K, V> printed) {
        Objects.requireNonNull(printed, "printed can't be null");
        PrintedInternal printedInternal = new PrintedInternal(printed);
        String orElseGenerateWithPrefix = new NamedInternal(printedInternal.name()).orElseGenerateWithPrefix(this.builder, PRINTING_NAME);
        this.builder.addGraphNode(this.graphNode, new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(printedInternal.build(this.name), orElseGenerateWithPrefix)));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void foreach(ForeachAction<? super K, ? super V> foreachAction) {
        foreach(foreachAction, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void foreach(ForeachAction<? super K, ? super V> foreachAction, Named named) {
        Objects.requireNonNull(foreachAction, "action can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FOREACH_NAME);
        this.builder.addGraphNode(this.graphNode, new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamPeek(foreachAction, false), orElseGenerateWithPrefix)));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> foreachAction) {
        return peek(foreachAction, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> foreachAction, Named named) {
        Objects.requireNonNull(foreachAction, "action can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, PEEK_NAME);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamPeek(foreachAction, true), orElseGenerateWithPrefix));
        this.builder.addGraphNode(this.graphNode, processorGraphNode);
        return new KStreamImpl(orElseGenerateWithPrefix, this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, processorGraphNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    @Deprecated
    public KStream<K, V>[] branch(Predicate<? super K, ? super V>... predicateArr) {
        return doBranch(NamedInternal.empty(), predicateArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    @Deprecated
    public KStream<K, V>[] branch(Named named, Predicate<? super K, ? super V>... predicateArr) {
        Objects.requireNonNull(named, "named can't be null");
        return doBranch(new NamedInternal(named), predicateArr);
    }

    private KStream<K, V>[] doBranch(NamedInternal namedInternal, Predicate<? super K, ? super V>... predicateArr) {
        Objects.requireNonNull(predicateArr, "predicates can't be a null array");
        if (predicateArr.length == 0) {
            throw new IllegalArgumentException("branch() requires at least one predicate");
        }
        for (Predicate<? super K, ? super V> predicate : predicateArr) {
            Objects.requireNonNull(predicate, "predicates can't be null");
        }
        String orElseGenerateWithPrefix = namedInternal.orElseGenerateWithPrefix(this.builder, BRANCH_NAME);
        String[] strArr = new String[predicateArr.length];
        for (int i = 0; i < predicateArr.length; i++) {
            strArr[i] = namedInternal.suffixWithOrElseGet("-predicate-" + i, this.builder, BRANCHCHILD_NAME);
        }
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamBranch(Arrays.asList((Object[]) predicateArr.clone()), Arrays.asList(strArr)), orElseGenerateWithPrefix));
        this.builder.addGraphNode(this.graphNode, processorGraphNode);
        KStream<K, V>[] kStreamArr = (KStream[]) Array.newInstance((Class<?>) KStream.class, predicateArr.length);
        for (int i2 = 0; i2 < predicateArr.length; i2++) {
            ProcessorGraphNode processorGraphNode2 = new ProcessorGraphNode(strArr[i2], new ProcessorParameters(new PassThrough(), strArr[i2]));
            this.builder.addGraphNode(processorGraphNode, processorGraphNode2);
            kStreamArr[i2] = new KStreamImpl(strArr[i2], this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, processorGraphNode2, this.builder);
        }
        return kStreamArr;
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public BranchedKStream<K, V> split() {
        return new BranchedKStreamImpl(this, this.repartitionRequired, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public BranchedKStream<K, V> split(Named named) {
        Objects.requireNonNull(named, "named can't be null");
        return new BranchedKStreamImpl(this, this.repartitionRequired, new NamedInternal(named));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> merge(KStream<K, V> kStream) {
        return merge(kStream, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> merge(KStream<K, V> kStream, Named named) {
        Objects.requireNonNull(kStream, "stream can't be null");
        Objects.requireNonNull(named, "named can't be null");
        return merge(this.builder, kStream, new NamedInternal(named));
    }

    private KStream<K, V> merge(InternalStreamsBuilder internalStreamsBuilder, KStream<K, V> kStream, NamedInternal namedInternal) {
        KStreamImpl kStreamImpl = (KStreamImpl) kStream;
        boolean z = kStreamImpl.repartitionRequired || this.repartitionRequired;
        String orElseGenerateWithPrefix = namedInternal.orElseGenerateWithPrefix(internalStreamsBuilder, MERGE_NAME);
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.subTopologySourceNodes);
        hashSet.addAll(kStreamImpl.subTopologySourceNodes);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new PassThrough(), orElseGenerateWithPrefix));
        processorGraphNode.setMergeNode(true);
        internalStreamsBuilder.addGraphNode(Arrays.asList(this.graphNode, kStreamImpl.graphNode), processorGraphNode);
        return new KStreamImpl(orElseGenerateWithPrefix, null, null, hashSet, z, processorGraphNode, internalStreamsBuilder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    @Deprecated
    public KStream<K, V> through(String str) {
        return through(str, Produced.with(this.keySerde, this.valueSerde, null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    @Deprecated
    public KStream<K, V> through(String str, Produced<K, V> produced) {
        Objects.requireNonNull(str, "topic can't be null");
        Objects.requireNonNull(produced, "produced can't be null");
        ProducedInternal producedInternal = new ProducedInternal(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valueSerde);
        }
        to(str, producedInternal);
        return this.builder.stream(Collections.singleton(str), new ConsumedInternal<>(producedInternal.keySerde(), producedInternal.valueSerde(), new FailOnInvalidTimestamp(), null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> repartition() {
        return doRepartition(Repartitioned.as(null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KStream<K, V> repartition(Repartitioned<K, V> repartitioned) {
        return doRepartition(repartitioned);
    }

    private KStream<K, V> doRepartition(Repartitioned<K, V> repartitioned) {
        Objects.requireNonNull(repartitioned, "repartitioned can't be null");
        RepartitionedInternal repartitionedInternal = new RepartitionedInternal(repartitioned);
        String name = repartitionedInternal.name() != null ? repartitionedInternal.name() : this.builder.newProcessorName(REPARTITION_NAME);
        Serde<V> valueSerde = repartitionedInternal.valueSerde() == null ? this.valueSerde : repartitionedInternal.valueSerde();
        Serde<K> keySerde = repartitionedInternal.keySerde() == null ? this.keySerde : repartitionedInternal.keySerde();
        UnoptimizableRepartitionNode.UnoptimizableRepartitionNodeBuilder unoptimizableRepartitionNodeBuilder = UnoptimizableRepartitionNode.unoptimizableRepartitionNodeBuilder();
        String createRepartitionedSource = createRepartitionedSource(this.builder, repartitionedInternal.keySerde(), valueSerde, name, repartitionedInternal.streamPartitioner(), unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(repartitionedInternal.toInternalTopicProperties()));
        UnoptimizableRepartitionNode<K, V> build = unoptimizableRepartitionNodeBuilder.build();
        this.builder.addGraphNode(this.graphNode, build);
        HashSet hashSet = new HashSet();
        hashSet.add(build.nodeName());
        return new KStreamImpl(createRepartitionedSource, keySerde, valueSerde, Collections.unmodifiableSet(hashSet), false, build, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(String str) {
        to(str, Produced.with(this.keySerde, this.valueSerde, null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(String str, Produced<K, V> produced) {
        Objects.requireNonNull(str, "topic can't be null");
        Objects.requireNonNull(produced, "produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valueSerde);
        }
        to((TopicNameExtractor) new StaticTopicNameExtractor(str), (ProducedInternal) producedInternal);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(TopicNameExtractor<K, V> topicNameExtractor) {
        to(topicNameExtractor, Produced.with(this.keySerde, this.valueSerde, null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void to(TopicNameExtractor<K, V> topicNameExtractor, Produced<K, V> produced) {
        Objects.requireNonNull(topicNameExtractor, "topicExtractor can't be null");
        Objects.requireNonNull(produced, "produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valueSerde);
        }
        to((TopicNameExtractor) topicNameExtractor, (ProducedInternal) producedInternal);
    }

    private void to(TopicNameExtractor<K, V> topicNameExtractor, ProducedInternal<K, V> producedInternal) {
        this.builder.addGraphNode(this.graphNode, new StreamSinkNode(new NamedInternal(producedInternal.name()).orElseGenerateWithPrefix(this.builder, SINK_NAME), topicNameExtractor, producedInternal));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KTable<K, V> toTable() {
        return toTable(NamedInternal.empty(), Materialized.with(this.keySerde, this.valueSerde));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KTable<K, V> toTable(Named named) {
        return toTable(named, Materialized.with(this.keySerde, this.valueSerde));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KTable<K, V> toTable(Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        return toTable(NamedInternal.empty(), materialized);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KTable<K, V> toTable(Named named, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        GraphNode graphNode;
        Set<String> set;
        Objects.requireNonNull(named, "named can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, TO_KTABLE_NAME);
        MaterializedInternal materializedInternal = new MaterializedInternal(materialized, this.builder, TO_KTABLE_NAME);
        Serde<K> keySerde = materializedInternal.keySerde() == null ? this.keySerde : materializedInternal.keySerde();
        Serde<V> valueSerde = materializedInternal.valueSerde() == null ? this.valueSerde : materializedInternal.valueSerde();
        if (this.repartitionRequired) {
            OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
            String createRepartitionedSource = createRepartitionedSource(this.builder, keySerde, valueSerde, orElseGenerateWithPrefix, null, optimizableRepartitionNodeBuilder);
            graphNode = optimizableRepartitionNodeBuilder.build();
            this.builder.addGraphNode(this.graphNode, graphNode);
            set = Collections.singleton(createRepartitionedSource);
        } else {
            graphNode = this.graphNode;
            set = this.subTopologySourceNodes;
        }
        KTableSource kTableSource = new KTableSource(materializedInternal.storeName(), materializedInternal.queryableStoreName());
        StreamToTableNode streamToTableNode = new StreamToTableNode(orElseGenerateWithPrefix, new ProcessorParameters(kTableSource, orElseGenerateWithPrefix), materializedInternal);
        this.builder.addGraphNode(graphNode, streamToTableNode);
        return new KTableImpl(orElseGenerateWithPrefix, keySerde, valueSerde, set, materializedInternal.queryableStoreName(), kTableSource, streamToTableNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> keyValueMapper) {
        return groupBy(keyValueMapper, Grouped.with(null, this.valueSerde));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    @Deprecated
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> keyValueMapper, Serialized<KR, V> serialized) {
        Objects.requireNonNull(keyValueMapper, "keySelector can't be null");
        Objects.requireNonNull(serialized, "serialized can't be null");
        SerializedInternal serializedInternal = new SerializedInternal(serialized);
        return groupBy(keyValueMapper, Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> keyValueMapper, Grouped<KR, V> grouped) {
        Objects.requireNonNull(keyValueMapper, "keySelector can't be null");
        Objects.requireNonNull(grouped, "grouped can't be null");
        GroupedInternal groupedInternal = new GroupedInternal(grouped);
        ProcessorGraphNode<K, V> internalSelectKey = internalSelectKey(keyValueMapper, new NamedInternal(groupedInternal.name()));
        internalSelectKey.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, internalSelectKey);
        return new KGroupedStreamImpl(internalSelectKey.nodeName(), this.subTopologySourceNodes, groupedInternal, true, internalSelectKey, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KGroupedStream<K, V> groupByKey() {
        return groupByKey(Grouped.with(this.keySerde, this.valueSerde));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    @Deprecated
    public KGroupedStream<K, V> groupByKey(Serialized<K, V> serialized) {
        Objects.requireNonNull(serialized, "serialized can't be null");
        SerializedInternal serializedInternal = new SerializedInternal(serialized);
        return groupByKey(Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public KGroupedStream<K, V> groupByKey(Grouped<K, V> grouped) {
        Objects.requireNonNull(grouped, "grouped can't be null");
        return new KGroupedStreamImpl(this.name, this.subTopologySourceNodes, new GroupedInternal(grouped), this.repartitionRequired, this.graphNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows) {
        return join(kStream, valueJoiner, joinWindows, Joined.with(null, null, null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    @Deprecated
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows, Joined<K, V, VO> joined) {
        Objects.requireNonNull(joined, "joined can't be null");
        JoinedInternal joinedInternal = new JoinedInternal(joined);
        return join(kStream, valueJoiner, joinWindows, StreamJoined.with(joinedInternal.keySerde(), joinedInternal.valueSerde(), joinedInternal.otherValueSerde()).withName2(joinedInternal.name()));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows, StreamJoined<K, V, VO> streamJoined) {
        return doJoin(kStream, valueJoiner, joinWindows, streamJoined, new KStreamImplJoin(this.builder, false, false));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows) {
        return leftJoin(kStream, valueJoiner, joinWindows, Joined.with(null, null, null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    @Deprecated
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows, Joined<K, V, VO> joined) {
        Objects.requireNonNull(joined, "joined can't be null");
        JoinedInternal joinedInternal = new JoinedInternal(joined);
        return leftJoin(kStream, valueJoiner, joinWindows, StreamJoined.with(joinedInternal.keySerde(), joinedInternal.valueSerde(), joinedInternal.otherValueSerde()).withName2(joinedInternal.name()));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows, StreamJoined<K, V, VO> streamJoined) {
        return doJoin(kStream, valueJoiner, joinWindows, streamJoined, new KStreamImplJoin(this.builder, true, false));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows) {
        return outerJoin(kStream, valueJoiner, joinWindows, Joined.with(null, null, null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    @Deprecated
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows, Joined<K, V, VO> joined) {
        Objects.requireNonNull(joined, "joined can't be null");
        JoinedInternal joinedInternal = new JoinedInternal(joined);
        return outerJoin(kStream, valueJoiner, joinWindows, StreamJoined.with(joinedInternal.keySerde(), joinedInternal.valueSerde(), joinedInternal.otherValueSerde()).withName2(joinedInternal.name()));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows, StreamJoined<K, V, VO> streamJoined) {
        return doJoin(kStream, valueJoiner, joinWindows, streamJoined, new KStreamImplJoin(this.builder, true, true));
    }

    private <VO, VR> KStream<K, VR> doJoin(KStream<K, VO> kStream, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, JoinWindows joinWindows, StreamJoined<K, V, VO> streamJoined, KStreamImplJoin kStreamImplJoin) {
        Objects.requireNonNull(kStream, "otherStream can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Objects.requireNonNull(joinWindows, "windows can't be null");
        Objects.requireNonNull(streamJoined, "streamJoined can't be null");
        KStreamImpl<K, V> kStreamImpl = this;
        KStreamImpl kStreamImpl2 = (KStreamImpl) kStream;
        StreamJoinedInternal streamJoinedInternal = new StreamJoinedInternal(streamJoined);
        NamedInternal namedInternal = new NamedInternal(streamJoinedInternal.name());
        if (kStreamImpl.repartitionRequired) {
            kStreamImpl = kStreamImpl.repartitionForJoin(namedInternal.suffixWithOrElseGet("-left", kStreamImpl.name), streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
        }
        if (kStreamImpl2.repartitionRequired) {
            kStreamImpl2 = kStreamImpl2.repartitionForJoin(namedInternal.suffixWithOrElseGet("-right", kStreamImpl2.name), streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
        }
        kStreamImpl.ensureCopartitionWith(Collections.singleton(kStreamImpl2));
        return kStreamImplJoin.join(kStreamImpl, kStreamImpl2, valueJoiner, joinWindows, streamJoined);
    }

    private KStreamImpl<K, V> repartitionForJoin(String str, Serde<K> serde, Serde<V> serde2) {
        Serde<K> serde3 = serde != null ? serde : this.keySerde;
        Serde<V> serde4 = serde2 != null ? serde2 : this.valueSerde;
        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
        String createRepartitionedSource = createRepartitionedSource(this.builder, serde3, serde4, str, null, optimizableRepartitionNodeBuilder);
        if (this.repartitionNode == null || !this.name.equals(str)) {
            this.repartitionNode = optimizableRepartitionNodeBuilder.build();
            this.builder.addGraphNode(this.graphNode, this.repartitionNode);
        }
        return new KStreamImpl<>(createRepartitionedSource, serde3, serde4, Collections.singleton(createRepartitionedSource), false, this.repartitionNode, this.builder);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String createRepartitionedSource(InternalStreamsBuilder internalStreamsBuilder, Serde<K1> serde, Serde<V1> serde2, String str, StreamPartitioner<K1, V1> streamPartitioner, BaseRepartitionNode.BaseRepartitionNodeBuilder<K1, V1, RN> baseRepartitionNodeBuilder) {
        String str2;
        String str3;
        String str4;
        String str5 = str.endsWith(REPARTITION_TOPIC_SUFFIX) ? str : str + REPARTITION_TOPIC_SUFFIX;
        String newProcessorName = internalStreamsBuilder.newProcessorName(SINK_NAME);
        String newProcessorName2 = internalStreamsBuilder.newProcessorName(FILTER_NAME);
        String newProcessorName3 = internalStreamsBuilder.newProcessorName(SOURCE_NAME);
        if (str.matches("KSTREAM.*-[0-9]{10}")) {
            str2 = newProcessorName;
            str3 = newProcessorName3;
            str4 = newProcessorName2;
        } else {
            str2 = str5 + "-sink";
            str3 = str5 + "-source";
            str4 = str5 + "-filter";
        }
        baseRepartitionNodeBuilder.withKeySerde(serde).withValueSerde(serde2).withSourceName(str3).withRepartitionTopic(str5).withSinkName(str2).withProcessorParameters(new ProcessorParameters<>(new KStreamFilter((obj, obj2) -> {
            return obj != null;
        }, false), str4)).withStreamPartitioner(streamPartitioner).withNodeName(str3);
        return str3;
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner) {
        return join(kTable, valueJoiner, Joined.with(null, null, null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(kTable, "table can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        String name = new JoinedInternal(joined).name();
        if (this.repartitionRequired) {
            return repartitionForJoin(name != null ? name : this.name, joined.keySerde(), joined.valueSerde()).doStreamTableJoin(kTable, valueJoiner, joined, false);
        }
        return doStreamTableJoin(kTable, valueJoiner, joined, false);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner) {
        return leftJoin(kTable, valueJoiner, Joined.with(null, null, null));
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(kTable, "table can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        String name = new JoinedInternal(joined).name();
        if (this.repartitionRequired) {
            return repartitionForJoin(name != null ? name : this.name, joined.keySerde(), joined.valueSerde()).doStreamTableJoin(kTable, valueJoiner, joined, true);
        }
        return doStreamTableJoin(kTable, valueJoiner, joined, true);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KG, VG, VR> KStream<K, VR> join(GlobalKTable<KG, VG> globalKTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyValueMapper, ValueJoiner<? super V, ? super VG, ? extends VR> valueJoiner) {
        return globalTableJoin(globalKTable, keyValueMapper, valueJoiner, false, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KG, VG, VR> KStream<K, VR> join(GlobalKTable<KG, VG> globalKTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyValueMapper, ValueJoiner<? super V, ? super VG, ? extends VR> valueJoiner, Named named) {
        return globalTableJoin(globalKTable, keyValueMapper, valueJoiner, false, named);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KG, VG, VR> KStream<K, VR> leftJoin(GlobalKTable<KG, VG> globalKTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyValueMapper, ValueJoiner<? super V, ? super VG, ? extends VR> valueJoiner) {
        return globalTableJoin(globalKTable, keyValueMapper, valueJoiner, true, NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KG, VG, VR> KStream<K, VR> leftJoin(GlobalKTable<KG, VG> globalKTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyValueMapper, ValueJoiner<? super V, ? super VG, ? extends VR> valueJoiner, Named named) {
        return globalTableJoin(globalKTable, keyValueMapper, valueJoiner, true, named);
    }

    private <KG, VG, VR> KStream<K, VR> globalTableJoin(GlobalKTable<KG, VG> globalKTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyValueMapper, ValueJoiner<? super V, ? super VG, ? extends VR> valueJoiner, boolean z, Named named) {
        Objects.requireNonNull(globalKTable, "globalTable can't be null");
        Objects.requireNonNull(keyValueMapper, "keySelector can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Objects.requireNonNull(named, "named can't be null");
        KTableValueGetterSupplier<K, V> valueGetterSupplier = ((GlobalKTableImpl) globalKTable).valueGetterSupplier();
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, LEFTJOIN_NAME);
        StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamGlobalKTableJoin(valueGetterSupplier, valueJoiner, keyValueMapper, z), orElseGenerateWithPrefix), new String[0], null);
        this.builder.addGraphNode(this.graphNode, streamTableJoinNode);
        return new KStreamImpl(orElseGenerateWithPrefix, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, streamTableJoinNode, this.builder);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <VO, VR> KStream<K, VR> doStreamTableJoin(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Joined<K, V, VO> joined, boolean z) {
        Objects.requireNonNull(kTable, "table can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Set<String> ensureCopartitionWith = ensureCopartitionWith(Collections.singleton((AbstractStream) kTable));
        String orElseGenerateWithPrefix = new NamedInternal(new JoinedInternal(joined).name()).orElseGenerateWithPrefix(this.builder, z ? LEFTJOIN_NAME : JOIN_NAME);
        StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamKTableJoin(((KTableImpl) kTable).valueGetterSupplier(), valueJoiner, z), orElseGenerateWithPrefix), ((KTableImpl) kTable).valueGetterSupplier().storeNames(), this.name);
        this.builder.addGraphNode(this.graphNode, streamTableJoinNode);
        return new KStreamImpl(orElseGenerateWithPrefix, joined.keySerde() != null ? joined.keySerde() : this.keySerde, null, ensureCopartitionWith, false, streamTableJoinNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR, VR> KStream<KR, VR> transform(TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, String... strArr) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        return (KStream<KR, VR>) flatTransform(new TransformerSupplierAdapter(transformerSupplier), Named.as(this.builder.newProcessorName(TRANSFORM_NAME)), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <KR, VR> KStream<KR, VR> transform(TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, Named named, String... strArr) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        return (KStream<KR, VR>) flatTransform(new TransformerSupplierAdapter(transformerSupplier), named, strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1> KStream<K1, V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, String... strArr) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        return flatTransform(transformerSupplier, Named.as(this.builder.newProcessorName(TRANSFORM_NAME)), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <K1, V1> KStream<K1, V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, Named named, String... strArr) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        Objects.requireNonNull(strArr, "stateStoreNames can't be a null array");
        ApiUtils.checkSupplier(transformerSupplier);
        for (String str : strArr) {
            Objects.requireNonNull(str, "stateStoreNames can't contain `null` as store name");
        }
        String name = new NamedInternal(named).name();
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode(name, new ProcessorParameters(new KStreamFlatTransform(transformerSupplier), name), strArr);
        statefulProcessorNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, statefulProcessorNode);
        return new KStreamImpl(name, null, null, this.subTopologySourceNodes, true, statefulProcessorNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, String... strArr) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, Named named, String... strArr) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), new NamedInternal(named), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, String... strArr) {
        Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier can't be null");
        return doTransformValues(valueTransformerWithKeySupplier, NamedInternal.empty(), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, Named named, String... strArr) {
        Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        return doTransformValues(valueTransformerWithKeySupplier, new NamedInternal(named), strArr);
    }

    private <VR> KStream<K, VR> doTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, NamedInternal namedInternal, String... strArr) {
        Objects.requireNonNull(strArr, "stateStoreNames can't be a null array");
        for (String str : strArr) {
            Objects.requireNonNull(str, "stateStoreNames can't contain `null` as store name");
        }
        ApiUtils.checkSupplier(valueTransformerWithKeySupplier);
        String orElseGenerateWithPrefix = namedInternal.orElseGenerateWithPrefix(this.builder, TRANSFORMVALUES_NAME);
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamTransformValues(valueTransformerWithKeySupplier), orElseGenerateWithPrefix), strArr);
        statefulProcessorNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, statefulProcessorNode);
        return new KStreamImpl(orElseGenerateWithPrefix, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, statefulProcessorNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, String... strArr) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, Named named, String... strArr) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), named, strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier, String... strArr) {
        Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier can't be null");
        return doFlatTransformValues(valueTransformerWithKeySupplier, NamedInternal.empty(), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier, Named named, String... strArr) {
        Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier can't be null");
        return doFlatTransformValues(valueTransformerWithKeySupplier, named, strArr);
    }

    private <VR> KStream<K, VR> doFlatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier, Named named, String... strArr) {
        Objects.requireNonNull(strArr, "stateStoreNames can't be a null array");
        for (String str : strArr) {
            Objects.requireNonNull(str, "stateStoreNames can't contain `null` as store name");
        }
        ApiUtils.checkSupplier(valueTransformerWithKeySupplier);
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, TRANSFORMVALUES_NAME);
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode(orElseGenerateWithPrefix, new ProcessorParameters(new KStreamFlatTransformValues(valueTransformerWithKeySupplier), orElseGenerateWithPrefix), strArr);
        statefulProcessorNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, statefulProcessorNode);
        return new KStreamImpl(orElseGenerateWithPrefix, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, statefulProcessorNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void process(ProcessorSupplier<? super K, ? super V> processorSupplier, String... strArr) {
        process(processorSupplier, Named.as(this.builder.newProcessorName(PROCESSOR_NAME)), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KStream
    public void process(ProcessorSupplier<? super K, ? super V> processorSupplier, Named named, String... strArr) {
        Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        Objects.requireNonNull(strArr, "stateStoreNames can't be a null array");
        ApiUtils.checkSupplier(processorSupplier);
        for (String str : strArr) {
            Objects.requireNonNull(str, "stateStoreNames can't be null");
        }
        String name = new NamedInternal(named).name();
        this.builder.addGraphNode(this.graphNode, new StatefulProcessorNode(name, new ProcessorParameters(processorSupplier, name), strArr));
    }
}
