package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinForeignProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerde;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde;
import org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableImpl.class */
public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(KTableImpl.class);
    static final String SOURCE_NAME = "KTABLE-SOURCE-";
    static final String STATE_STORE_NAME = "STATE-STORE-";
    private static final String FILTER_NAME = "KTABLE-FILTER-";
    private static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
    private static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
    private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
    private static final String MERGE_NAME = "KTABLE-MERGE-";
    private static final String SELECT_NAME = "KTABLE-SELECT-";
    private static final String SUPPRESS_NAME = "KTABLE-SUPPRESS-";
    private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
    private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
    private static final String FK_JOIN = "KTABLE-FK-JOIN-";
    private static final String FK_JOIN_STATE_STORE_NAME = "KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-";
    private static final String SUBSCRIPTION_REGISTRATION = "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-";
    private static final String SUBSCRIPTION_RESPONSE = "KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-";
    private static final String SUBSCRIPTION_PROCESSOR = "KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-";
    private static final String SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR = "KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-";
    private static final String FK_JOIN_OUTPUT_NAME = "KTABLE-FK-JOIN-OUTPUT-";
    private static final String TOPIC_SUFFIX = "-topic";
    private static final String SINK_NAME = "KTABLE-SINK-";
    private final ProcessorSupplier<?, ?> processorSupplier;
    private final String queryableStoreName;
    private boolean sendOldValues;

    public KTableImpl(String str, Serde<K> serde, Serde<V> serde2, Set<String> set, String str2, ProcessorSupplier<?, ?> processorSupplier, StreamsGraphNode streamsGraphNode, InternalStreamsBuilder internalStreamsBuilder) {
        super(str, serde, serde2, set, streamsGraphNode, internalStreamsBuilder);
        this.sendOldValues = false;
        this.processorSupplier = processorSupplier;
        this.queryableStoreName = str2;
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public String queryableStoreName() {
        return this.queryableStoreName;
    }

    private KTable<K, V> doFilter(Predicate<? super K, ? super V> predicate, Named named, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal, boolean z) {
        Serde<K> serde;
        Serde<V> serde2;
        String str;
        StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
        if (materializedInternal != null) {
            if (materializedInternal.storeName() == null) {
                this.builder.newStoreName(FILTER_NAME);
            }
            serde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
            serde2 = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valSerde;
            str = materializedInternal.queryableStoreName();
            storeBuilder = str != null ? new TimestampedKeyValueStoreMaterializer(materializedInternal).materialize() : null;
        } else {
            serde = this.keySerde;
            serde2 = this.valSerde;
            str = null;
            storeBuilder = null;
        }
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        KTableFilter kTableFilter = new KTableFilter(this, predicate, z, str);
        TableProcessorNode tableProcessorNode = new TableProcessorNode(orElseGenerateWithPrefix, unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters<>(kTableFilter, orElseGenerateWithPrefix)), storeBuilder);
        this.builder.addGraphNode(this.streamsGraphNode, tableProcessorNode);
        return new KTableImpl(orElseGenerateWithPrefix, serde, serde2, this.sourceNodes, str, kTableFilter, tableProcessorNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return doFilter(predicate, NamedInternal.empty(), null, false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return doFilter(predicate, named, null, false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate, Named named, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        return doFilter(predicate, named, new MaterializedInternal<>(materialized), false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        return filter(predicate, NamedInternal.empty(), materialized);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return doFilter(predicate, NamedInternal.empty(), null, true);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        return doFilter(predicate, named, null, true);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        return filterNot(predicate, NamedInternal.empty(), materialized);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        return doFilter(predicate, new NamedInternal(named), new MaterializedInternal<>(materialized), true);
    }

    private <VR> KTable<K, VR> doMapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey, Named named, MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        Serde<K> serde;
        Serde<VR> serde2;
        String str;
        StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
        if (materializedInternal != null) {
            if (materializedInternal.storeName() == null) {
                this.builder.newStoreName(MAPVALUES_NAME);
            }
            serde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
            serde2 = materializedInternal.valueSerde();
            str = materializedInternal.queryableStoreName();
            storeBuilder = str != null ? new TimestampedKeyValueStoreMaterializer(materializedInternal).materialize() : null;
        } else {
            serde = this.keySerde;
            serde2 = null;
            str = null;
            storeBuilder = null;
        }
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAPVALUES_NAME);
        KTableMapValues kTableMapValues = new KTableMapValues(this, valueMapperWithKey, str);
        TableProcessorNode tableProcessorNode = new TableProcessorNode(orElseGenerateWithPrefix, unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters<>(kTableMapValues, orElseGenerateWithPrefix)), storeBuilder);
        this.builder.addGraphNode(this.streamsGraphNode, tableProcessorNode);
        return new KTableImpl(orElseGenerateWithPrefix, serde, serde2, this.sourceNodes, str, kTableMapValues, tableProcessorNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> valueMapper) {
        Objects.requireNonNull(valueMapper, "mapper can't be null");
        return doMapValues(withKey(valueMapper), NamedInternal.empty(), null);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> valueMapper, Named named) {
        Objects.requireNonNull(valueMapper, "mapper can't be null");
        return doMapValues(withKey(valueMapper), named, null);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey) {
        Objects.requireNonNull(valueMapperWithKey, "mapper can't be null");
        return doMapValues(valueMapperWithKey, NamedInternal.empty(), null);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey, Named named) {
        Objects.requireNonNull(valueMapperWithKey, "mapper can't be null");
        return doMapValues(valueMapperWithKey, named, null);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> valueMapper, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return mapValues(valueMapper, NamedInternal.empty(), materialized);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> mapValues(ValueMapper<? super V, ? extends VR> valueMapper, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(valueMapper, "mapper can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        return doMapValues(withKey(valueMapper), named, new MaterializedInternal<>(materialized));
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return mapValues(valueMapperWithKey, NamedInternal.empty(), materialized);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(valueMapperWithKey, "mapper can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        return doMapValues(valueMapperWithKey, named, new MaterializedInternal<>(materialized));
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, String... strArr) {
        return doTransformValues(valueTransformerWithKeySupplier, null, NamedInternal.empty(), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, Named named, String... strArr) {
        Objects.requireNonNull(named, "processorName can't be null");
        return doTransformValues(valueTransformerWithKeySupplier, null, new NamedInternal(named), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, String... strArr) {
        return transformValues(valueTransformerWithKeySupplier, materialized, NamedInternal.empty(), strArr);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, Named named, String... strArr) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        Objects.requireNonNull(named, "named can't be null");
        return doTransformValues(valueTransformerWithKeySupplier, new MaterializedInternal<>(materialized), new NamedInternal(named), strArr);
    }

    private <VR> KTable<K, VR> doTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal, NamedInternal namedInternal, String... strArr) {
        Serde<K> serde;
        Serde<VR> serde2;
        String str;
        StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder;
        Objects.requireNonNull(strArr, "stateStoreNames");
        if (materializedInternal != null) {
            serde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
            serde2 = materializedInternal.valueSerde();
            str = materializedInternal.queryableStoreName();
            storeBuilder = str != null ? new TimestampedKeyValueStoreMaterializer(materializedInternal).materialize() : null;
        } else {
            serde = this.keySerde;
            serde2 = null;
            str = null;
            storeBuilder = null;
        }
        String orElseGenerateWithPrefix = namedInternal.orElseGenerateWithPrefix(this.builder, TRANSFORMVALUES_NAME);
        KTableTransformValues kTableTransformValues = new KTableTransformValues(this, valueTransformerWithKeySupplier, str);
        TableProcessorNode tableProcessorNode = new TableProcessorNode(orElseGenerateWithPrefix, unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters<>(kTableTransformValues, orElseGenerateWithPrefix)), storeBuilder, strArr);
        this.builder.addGraphNode(this.streamsGraphNode, tableProcessorNode);
        return new KTableImpl(orElseGenerateWithPrefix, serde, serde2, this.sourceNodes, str, kTableTransformValues, tableProcessorNode, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KStream<K, V> toStream() {
        return toStream(NamedInternal.empty());
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KStream<K, V> toStream(Named named) {
        Objects.requireNonNull(named, "named can't be null");
        String orElseGenerateWithPrefix = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, TOSTREAM_NAME);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, unsafeCastProcessorParametersToCompletelyDifferentType(new ProcessorParameters<>(new KStreamMapValues((obj, change) -> {
            return change.newValue;
        }), orElseGenerateWithPrefix)));
        this.builder.addGraphNode(this.streamsGraphNode, processorGraphNode);
        return new KStreamImpl(orElseGenerateWithPrefix, this.keySerde, this.valSerde, this.sourceNodes, false, processorGraphNode, this.builder);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KTable
    public <K1> KStream<K1, V> toStream(KeyValueMapper<? super K, ? super V, ? extends K1> keyValueMapper) {
        return (KStream<K1, V>) toStream().selectKey(keyValueMapper);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KTable
    public <K1> KStream<K1, V> toStream(KeyValueMapper<? super K, ? super V, ? extends K1> keyValueMapper, Named named) {
        return (KStream<K1, V>) toStream(named).selectKey(keyValueMapper);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> suppress(Suppressed<? super K> suppressed) {
        if (!(suppressed instanceof NamedSuppressed)) {
            throw new IllegalArgumentException("Custom subclasses of Suppressed are not supported.");
        }
        String name = ((NamedSuppressed) suppressed).name();
        String newProcessorName = name != null ? name : this.builder.newProcessorName(SUPPRESS_NAME);
        SuppressedInternal<K> buildSuppress = buildSuppress(suppressed, newProcessorName);
        String newStoreName = buildSuppress.name() != null ? buildSuppress.name() + "-store" : this.builder.newStoreName(SUPPRESS_NAME);
        KTableSuppressProcessorSupplier kTableSuppressProcessorSupplier = new KTableSuppressProcessorSupplier(buildSuppress, newStoreName, this);
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode(newProcessorName, new ProcessorParameters(kTableSuppressProcessorSupplier, newProcessorName), new InMemoryTimeOrderedKeyValueBuffer.Builder(newStoreName, this.keySerde, this.valSerde));
        this.builder.addGraphNode(this.streamsGraphNode, statefulProcessorNode);
        return new KTableImpl(newProcessorName, this.keySerde, this.valSerde, Collections.singleton(this.name), null, kTableSuppressProcessorSupplier, statefulProcessorNode, this.builder);
    }

    private SuppressedInternal<K> buildSuppress(Suppressed<? super K> suppressed, String str) {
        if (suppressed instanceof FinalResultsSuppressionBuilder) {
            long findAndVerifyWindowGrace = GraphGraceSearchUtil.findAndVerifyWindowGrace(this.streamsGraphNode);
            LOG.info("Using grace period of [{}] as the suppress duration for node [{}].", Duration.ofMillis(findAndVerifyWindowGrace), str);
            return ((FinalResultsSuppressionBuilder) suppressed).buildFinalResultsSuppression(Duration.ofMillis(findAndVerifyWindowGrace));
        }
        if (suppressed instanceof SuppressedInternal) {
            return (SuppressedInternal) suppressed;
        }
        throw new IllegalArgumentException("Custom subclasses of Suppressed are not allowed.");
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1, R> KTable<K, R> join(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return (KTable<K, R>) doJoin(kTable, valueJoiner, NamedInternal.empty(), null, false, false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1, R> KTable<K, R> join(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, Named named) {
        return (KTable<K, R>) doJoin(kTable, valueJoiner, named, null, false, false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VO, VR> KTable<K, VR> join(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return join(kTable, valueJoiner, NamedInternal.empty(), materialized);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VO, VR> KTable<K, VR> join(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        return doJoin(kTable, valueJoiner, named, new MaterializedInternal<>(materialized, this.builder, MERGE_NAME), false, false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return outerJoin(kTable, valueJoiner, NamedInternal.empty());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, Named named) {
        return (KTable<K, R>) doJoin(kTable, valueJoiner, named, null, true, true);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VO, VR> KTable<K, VR> outerJoin(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return outerJoin(kTable, valueJoiner, NamedInternal.empty(), materialized);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VO, VR> KTable<K, VR> outerJoin(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        return doJoin(kTable, valueJoiner, named, new MaterializedInternal<>(materialized, this.builder, MERGE_NAME), true, true);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return leftJoin(kTable, valueJoiner, NamedInternal.empty());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> kTable, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, Named named) {
        return (KTable<K, R>) doJoin(kTable, valueJoiner, named, null, true, false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VO, VR> KTable<K, VR> leftJoin(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return leftJoin(kTable, valueJoiner, NamedInternal.empty(), materialized);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VO, VR> KTable<K, VR> leftJoin(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        return doJoin(kTable, valueJoiner, named, new MaterializedInternal<>(materialized, this.builder, MERGE_NAME), true, false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <VO, VR> KTable<K, VR> doJoin(KTable<K, VO> kTable, ValueJoiner<? super V, ? super VO, ? extends VR> valueJoiner, Named named, MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal, boolean z, boolean z2) {
        KTableKTableAbstractJoin kTableKTableOuterJoin;
        KTableKTableAbstractJoin kTableKTableOuterJoin2;
        Serde<K> serde;
        Serde<VR> serde2;
        String str;
        StoreBuilder<TimestampedKeyValueStore<K, VR>> storeBuilder;
        Objects.requireNonNull(kTable, "other can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Objects.requireNonNull(named, "joinName can't be null");
        NamedInternal namedInternal = new NamedInternal(named);
        String orElseGenerateWithPrefix = namedInternal.orElseGenerateWithPrefix(this.builder, MERGE_NAME);
        Set<String> ensureJoinableWith = ensureJoinableWith((AbstractStream) kTable);
        if (z) {
            enableSendingOldValues();
        }
        if (z2) {
            ((KTableImpl) kTable).enableSendingOldValues();
        }
        if (!z) {
            kTableKTableOuterJoin = new KTableKTableInnerJoin(this, (KTableImpl) kTable, valueJoiner);
            kTableKTableOuterJoin2 = new KTableKTableInnerJoin((KTableImpl) kTable, this, reverseJoiner(valueJoiner));
        } else if (z2) {
            kTableKTableOuterJoin = new KTableKTableOuterJoin(this, (KTableImpl) kTable, valueJoiner);
            kTableKTableOuterJoin2 = new KTableKTableOuterJoin((KTableImpl) kTable, this, reverseJoiner(valueJoiner));
        } else {
            kTableKTableOuterJoin = new KTableKTableLeftJoin(this, (KTableImpl) kTable, valueJoiner);
            kTableKTableOuterJoin2 = new KTableKTableRightJoin((KTableImpl) kTable, this, reverseJoiner(valueJoiner));
        }
        String suffixWithOrElseGet = namedInternal.suffixWithOrElseGet("-join-this", this.builder, JOINTHIS_NAME);
        String suffixWithOrElseGet2 = namedInternal.suffixWithOrElseGet("-join-other", this.builder, JOINOTHER_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(kTableKTableOuterJoin, suffixWithOrElseGet);
        ProcessorParameters processorParameters2 = new ProcessorParameters(kTableKTableOuterJoin2, suffixWithOrElseGet2);
        if (materializedInternal != null) {
            if (materializedInternal.keySerde() == null) {
                materializedInternal.withKeySerde(this.keySerde);
            }
            serde = materializedInternal.keySerde();
            serde2 = materializedInternal.valueSerde();
            str = materializedInternal.storeName();
            storeBuilder = new TimestampedKeyValueStoreMaterializer(materializedInternal).materialize();
        } else {
            serde = this.keySerde;
            serde2 = null;
            str = null;
            storeBuilder = null;
        }
        KTableKTableJoinNode build = KTableKTableJoinNode.kTableKTableJoinNodeBuilder().withNodeName(orElseGenerateWithPrefix).withJoinThisProcessorParameters(processorParameters).withJoinOtherProcessorParameters(processorParameters2).withThisJoinSideNodeName(this.name).withOtherJoinSideNodeName(((KTableImpl) kTable).name).withJoinThisStoreNames(valueGetterSupplier().storeNames()).withJoinOtherStoreNames(((KTableImpl) kTable).valueGetterSupplier().storeNames()).withKeySerde(serde).withValueSerde(serde2).withQueryableStoreName(str).withStoreBuilder(storeBuilder).build();
        this.builder.addGraphNode(this.streamsGraphNode, build);
        return new KTableImpl(build.nodeName(), build.keySerde(), build.valueSerde(), ensureJoinableWith, build.queryableStoreName(), build.joinMerger(), build, this.builder);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> keyValueMapper) {
        return groupBy(keyValueMapper, Grouped.with(null, null));
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    @Deprecated
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> keyValueMapper, Serialized<K1, V1> serialized) {
        Objects.requireNonNull(keyValueMapper, "selector can't be null");
        Objects.requireNonNull(serialized, "serialized can't be null");
        SerializedInternal serializedInternal = new SerializedInternal(serialized);
        return groupBy(keyValueMapper, Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> keyValueMapper, Grouped<K1, V1> grouped) {
        Objects.requireNonNull(keyValueMapper, "selector can't be null");
        Objects.requireNonNull(grouped, "grouped can't be null");
        GroupedInternal groupedInternal = new GroupedInternal(grouped);
        String orElseGenerateWithPrefix = new NamedInternal(groupedInternal.name()).orElseGenerateWithPrefix(this.builder, SELECT_NAME);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(orElseGenerateWithPrefix, new ProcessorParameters(new KTableRepartitionMap(this, keyValueMapper), orElseGenerateWithPrefix));
        this.builder.addGraphNode(this.streamsGraphNode, processorGraphNode);
        enableSendingOldValues();
        return new KGroupedTableImpl(this.builder, orElseGenerateWithPrefix, this.sourceNodes, groupedInternal, processorGraphNode);
    }

    public KTableValueGetterSupplier<K, V> valueGetterSupplier() {
        if (!(this.processorSupplier instanceof KTableSource)) {
            return this.processorSupplier instanceof KStreamAggProcessorSupplier ? ((KStreamAggProcessorSupplier) this.processorSupplier).view() : ((KTableProcessorSupplier) this.processorSupplier).view();
        }
        KTableSource kTableSource = (KTableSource) this.processorSupplier;
        kTableSource.materialize();
        return new KTableSourceValueGetterSupplier(kTableSource.queryableName());
    }

    public void enableSendingOldValues() {
        if (this.sendOldValues) {
            return;
        }
        if (this.processorSupplier instanceof KTableSource) {
            ((KTableSource) this.processorSupplier).enableSendingOldValues();
        } else if (this.processorSupplier instanceof KStreamAggProcessorSupplier) {
            ((KStreamAggProcessorSupplier) this.processorSupplier).enableSendingOldValues();
        } else {
            ((KTableProcessorSupplier) this.processorSupplier).enableSendingOldValues();
        }
        this.sendOldValues = true;
    }

    boolean sendingOldValueEnabled() {
        return this.sendOldValues;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <VR> ProcessorParameters<K, VR> unsafeCastProcessorParametersToCompletelyDifferentType(ProcessorParameters<K, Change<V>> processorParameters) {
        return processorParameters;
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> kTable, Function<V, KO> function, ValueJoiner<V, VO, VR> valueJoiner) {
        return doJoinOnForeignKey(kTable, function, valueJoiner, NamedInternal.empty(), Materialized.with(null, null), false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> kTable, Function<V, KO> function, ValueJoiner<V, VO, VR> valueJoiner, Named named) {
        return doJoinOnForeignKey(kTable, function, valueJoiner, named, Materialized.with(null, null), false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> kTable, Function<V, KO> function, ValueJoiner<V, VO, VR> valueJoiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return doJoinOnForeignKey(kTable, function, valueJoiner, NamedInternal.empty(), materialized, false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR, KO, VO> KTable<K, VR> join(KTable<KO, VO> kTable, Function<V, KO> function, ValueJoiner<V, VO, VR> valueJoiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return doJoinOnForeignKey(kTable, function, valueJoiner, named, materialized, false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> kTable, Function<V, KO> function, ValueJoiner<V, VO, VR> valueJoiner) {
        return doJoinOnForeignKey(kTable, function, valueJoiner, NamedInternal.empty(), Materialized.with(null, null), true);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> kTable, Function<V, KO> function, ValueJoiner<V, VO, VR> valueJoiner, Named named) {
        return doJoinOnForeignKey(kTable, function, valueJoiner, named, Materialized.with(null, null), true);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> kTable, Function<V, KO> function, ValueJoiner<V, VO, VR> valueJoiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return doJoinOnForeignKey(kTable, function, valueJoiner, named, materialized, true);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <VR, KO, VO> KTable<K, VR> leftJoin(KTable<KO, VO> kTable, Function<V, KO> function, ValueJoiner<V, VO, VR> valueJoiner, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        return doJoinOnForeignKey(kTable, function, valueJoiner, NamedInternal.empty(), materialized, true);
    }

    private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(KTable<KO, VO> kTable, Function<V, KO> function, ValueJoiner<V, VO, VR> valueJoiner, Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, boolean z) {
        Objects.requireNonNull(kTable, "foreignKeyTable can't be null");
        Objects.requireNonNull(function, "foreignKeyExtractor can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Objects.requireNonNull(named, "joinName can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        ((KTableImpl) kTable).enableSendingOldValues();
        enableSendingOldValues();
        NamedInternal namedInternal = new NamedInternal(named);
        String str = namedInternal.suffixWithOrElseGet("-subscription-registration", this.builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX;
        Supplier supplier = () -> {
            return internalTopologyBuilder().decoratePseudoTopic(str + "-pk");
        };
        Supplier supplier2 = () -> {
            return internalTopologyBuilder().decoratePseudoTopic(str + "-fk");
        };
        Supplier supplier3 = () -> {
            return internalTopologyBuilder().decoratePseudoTopic(str + "-vh");
        };
        this.builder.internalTopologyBuilder.addInternalTopic(str);
        Serde<K> serde = ((KTableImpl) kTable).keySerde;
        SubscriptionWrapperSerde subscriptionWrapperSerde = new SubscriptionWrapperSerde(supplier, this.keySerde);
        SubscriptionResponseWrapperSerde subscriptionResponseWrapperSerde = new SubscriptionResponseWrapperSerde(((KTableImpl) kTable).valSerde);
        CombinedKeySchema combinedKeySchema = new CombinedKeySchema(supplier2, serde, supplier, this.keySerde);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(new ProcessorParameters(new ForeignJoinSubscriptionSendProcessorSupplier(function, supplier2, supplier3, serde, this.valSerde == null ? null : this.valSerde.serializer(), z), namedInternal.suffixWithOrElseGet("-subscription-registration-processor", this.builder, SUBSCRIPTION_REGISTRATION)));
        this.builder.addGraphNode(this.streamsGraphNode, processorGraphNode);
        StreamSinkNode streamSinkNode = new StreamSinkNode(namedInternal.suffixWithOrElseGet("-subscription-registration-sink", this.builder, SINK_NAME), new StaticTopicNameExtractor(str), new ProducedInternal(Produced.with(serde, subscriptionWrapperSerde)));
        this.builder.addGraphNode(processorGraphNode, streamSinkNode);
        StreamSourceNode streamSourceNode = new StreamSourceNode(namedInternal.suffixWithOrElseGet("-subscription-registration-source", this.builder, SOURCE_NAME), Collections.singleton(str), new ConsumedInternal(Consumed.with(serde, subscriptionWrapperSerde)));
        this.builder.addGraphNode(streamSinkNode, streamSourceNode);
        HashSet hashSet = new HashSet(((KTableImpl) kTable).sourceNodes);
        hashSet.add(streamSourceNode.nodeName());
        this.builder.internalTopologyBuilder.copartitionSources(hashSet);
        StoreBuilder timestampedKeyValueStoreBuilder = Stores.timestampedKeyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(namedInternal.suffixWithOrElseGet("-subscription-store", this.builder, FK_JOIN_STATE_STORE_NAME)), new Serdes.BytesSerde(), subscriptionWrapperSerde);
        this.builder.addStateStore(timestampedKeyValueStoreBuilder);
        StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode(new ProcessorParameters(new SubscriptionStoreReceiveProcessorSupplier(timestampedKeyValueStoreBuilder, combinedKeySchema), namedInternal.suffixWithOrElseGet("-subscription-receive", this.builder, SUBSCRIPTION_PROCESSOR)), (Set<StoreBuilder<? extends StateStore>>) Collections.singleton(timestampedKeyValueStoreBuilder), (Set<KTableValueGetterSupplier<?, ?>>) Collections.emptySet());
        this.builder.addGraphNode(streamSourceNode, statefulProcessorNode);
        StatefulProcessorNode statefulProcessorNode2 = new StatefulProcessorNode(new ProcessorParameters(new SubscriptionJoinForeignProcessorSupplier(((KTableImpl) kTable).valueGetterSupplier()), namedInternal.suffixWithOrElseGet("-subscription-join-foreign", this.builder, SUBSCRIPTION_PROCESSOR)), (Set<StoreBuilder<? extends StateStore>>) Collections.emptySet(), (Set<KTableValueGetterSupplier<?, ?>>) Collections.singleton(((KTableImpl) kTable).valueGetterSupplier()));
        this.builder.addGraphNode(statefulProcessorNode, statefulProcessorNode2);
        StatefulProcessorNode statefulProcessorNode3 = new StatefulProcessorNode(new ProcessorParameters(new ForeignJoinSubscriptionProcessorSupplier(timestampedKeyValueStoreBuilder, combinedKeySchema), namedInternal.suffixWithOrElseGet("-foreign-join-subscription", this.builder, SUBSCRIPTION_PROCESSOR)), (Set<StoreBuilder<? extends StateStore>>) Collections.singleton(timestampedKeyValueStoreBuilder), (Set<KTableValueGetterSupplier<?, ?>>) Collections.emptySet());
        this.builder.addGraphNode(((KTableImpl) kTable).streamsGraphNode, statefulProcessorNode3);
        String str2 = namedInternal.suffixWithOrElseGet("-subscription-response", this.builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
        this.builder.internalTopologyBuilder.addInternalTopic(str2);
        StreamSinkNode streamSinkNode2 = new StreamSinkNode(namedInternal.suffixWithOrElseGet("-subscription-response-sink", this.builder, SINK_NAME), new StaticTopicNameExtractor(str2), new ProducedInternal(Produced.with(this.keySerde, subscriptionResponseWrapperSerde)));
        this.builder.addGraphNode(statefulProcessorNode2, streamSinkNode2);
        this.builder.addGraphNode(statefulProcessorNode3, streamSinkNode2);
        StreamSourceNode streamSourceNode2 = new StreamSourceNode(namedInternal.suffixWithOrElseGet("-subscription-response-source", this.builder, SOURCE_NAME), Collections.singleton(str2), new ConsumedInternal(Consumed.with(this.keySerde, subscriptionResponseWrapperSerde)));
        this.builder.addGraphNode(streamSinkNode2, streamSourceNode2);
        HashSet hashSet2 = new HashSet(this.sourceNodes);
        hashSet2.add(streamSourceNode2.nodeName());
        this.builder.internalTopologyBuilder.copartitionSources(hashSet2);
        KTableValueGetterSupplier<K, V> valueGetterSupplier = valueGetterSupplier();
        StatefulProcessorNode statefulProcessorNode4 = new StatefulProcessorNode(new ProcessorParameters(new SubscriptionResolverJoinProcessorSupplier(valueGetterSupplier, this.valSerde == null ? null : this.valSerde.serializer(), supplier3, valueJoiner, z), namedInternal.suffixWithOrElseGet("-subscription-response-resolver", this.builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)), (Set<StoreBuilder<? extends StateStore>>) Collections.emptySet(), (Set<KTableValueGetterSupplier<?, ?>>) Collections.singleton(valueGetterSupplier));
        this.builder.addGraphNode(streamSourceNode2, statefulProcessorNode4);
        String suffixWithOrElseGet = namedInternal.suffixWithOrElseGet("-result", this.builder, FK_JOIN_OUTPUT_NAME);
        MaterializedInternal materializedInternal = new MaterializedInternal(materialized, this.builder, FK_JOIN_OUTPUT_NAME);
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        KTableSource kTableSource = new KTableSource(materializedInternal.storeName(), materializedInternal.queryableStoreName());
        TableProcessorNode tableProcessorNode = new TableProcessorNode(suffixWithOrElseGet, new ProcessorParameters(kTableSource, suffixWithOrElseGet), new TimestampedKeyValueStoreMaterializer(materializedInternal).materialize());
        this.builder.addGraphNode(statefulProcessorNode4, tableProcessorNode);
        return new KTableImpl(suffixWithOrElseGet, this.keySerde, materializedInternal.valueSerde(), hashSet2, materializedInternal.storeName(), kTableSource, tableProcessorNode, this.builder);
    }
}
