package org.apache.kafka.streams.kstream.internals;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableImpl.class */
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
    private static final String FILTER_NAME = "KTABLE-FILTER-";
    private static final String FOREACH_NAME = "KTABLE-FOREACH-";
    public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
    public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
    public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
    public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
    private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
    public static final String MERGE_NAME = "KTABLE-MERGE-";
    public static final String OUTERTHIS_NAME = "KTABLE-OUTERTHIS-";
    public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String SELECT_NAME = "KTABLE-SELECT-";
    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
    private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
    public final ProcessorSupplier<?, ?> processorSupplier;
    private final Serde<K> keySerde;
    private final Serde<V> valSerde;
    private final String storeName;
    private boolean sendOldValues;

    public KTableImpl(KStreamBuilder kStreamBuilder, String str, ProcessorSupplier<?, ?> processorSupplier, Set<String> set, String str2) {
        this(kStreamBuilder, str, processorSupplier, set, null, null, str2);
    }

    public KTableImpl(KStreamBuilder kStreamBuilder, String str, ProcessorSupplier<?, ?> processorSupplier, Set<String> set, Serde<K> serde, Serde<V> serde2, String str2) {
        super(kStreamBuilder, str, set);
        this.sendOldValues = false;
        this.processorSupplier = processorSupplier;
        this.keySerde = serde;
        this.valSerde = serde2;
        this.storeName = str2;
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public String getStoreName() {
        return this.storeName;
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filter(Predicate<K, V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String newName = this.topology.newName(FILTER_NAME);
        KTableFilter kTableFilter = new KTableFilter(this, predicate, false);
        this.topology.addProcessor(newName, kTableFilter, this.name);
        return new KTableImpl(this.topology, newName, kTableFilter, this.sourceNodes, this.storeName);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> filterNot(Predicate<K, V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String newName = this.topology.newName(FILTER_NAME);
        KTableFilter kTableFilter = new KTableFilter(this, predicate, true);
        this.topology.addProcessor(newName, kTableFilter, this.name);
        return new KTableImpl(this.topology, newName, kTableFilter, this.sourceNodes, this.storeName);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> valueMapper) {
        Objects.requireNonNull(valueMapper);
        String newName = this.topology.newName(MAPVALUES_NAME);
        KTableMapValues kTableMapValues = new KTableMapValues(this, valueMapper);
        this.topology.addProcessor(newName, kTableMapValues, this.name);
        return new KTableImpl(this.topology, newName, kTableMapValues, this.sourceNodes, this.storeName);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void print() {
        print(null, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void print(String str) {
        print(null, null, str);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void print(Serde<K> serde, Serde<V> serde2) {
        print(serde, serde2, null);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void print(Serde<K> serde, Serde<V> serde2, String str) {
        this.topology.addProcessor(this.topology.newName(PRINTING_NAME), new KeyValuePrinter(serde, serde2, str == null ? this.name : str), this.name);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void writeAsText(String str) {
        writeAsText(str, null, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void writeAsText(String str, String str2) {
        writeAsText(str, str2, null, null);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void writeAsText(String str, Serde<K> serde, Serde<V> serde2) {
        writeAsText(str, null, serde, serde2);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void writeAsText(String str, String str2, Serde<K> serde, Serde<V> serde2) {
        try {
            this.topology.addProcessor(this.topology.newName(PRINTING_NAME), new KeyValuePrinter(new PrintStream(new FileOutputStream(str)), serde, serde2, str2 == null ? this.name : str2), this.name);
        } catch (FileNotFoundException e) {
            throw new TopologyBuilderException("Unable to write stream to file at [" + str + "] " + e.getMessage());
        }
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void foreach(final ForeachAction<K, V> foreachAction) {
        Objects.requireNonNull(foreachAction, "action can't be null");
        this.topology.addProcessor(this.topology.newName(FOREACH_NAME), new KStreamForeach(new ForeachAction<K, Change<V>>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImpl.1
            public void apply(K k, Change<V> change) {
                foreachAction.apply(k, change.newValue);
            }

            @Override // org.apache.kafka.streams.kstream.ForeachAction
            public /* bridge */ /* synthetic */ void apply(Object obj, Object obj2) {
                apply((AnonymousClass1) obj, (Change) obj2);
            }
        }), this.name);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> through(Serde<K> serde, Serde<V> serde2, StreamPartitioner<K, V> streamPartitioner, String str, String str2) {
        Objects.requireNonNull(str2, "storeName can't be null");
        to(serde, serde2, streamPartitioner, str);
        return this.topology.table(serde, serde2, str, str2);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> through(Serde<K> serde, Serde<V> serde2, String str, String str2) {
        return through(serde, serde2, null, str, str2);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> through(StreamPartitioner<K, V> streamPartitioner, String str, String str2) {
        return through(null, null, streamPartitioner, str, str2);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KTable<K, V> through(String str, String str2) {
        return through(null, null, null, str, str2);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void to(String str) {
        to(null, null, null, str);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void to(StreamPartitioner<K, V> streamPartitioner, String str) {
        to(null, null, streamPartitioner, str);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void to(Serde<K> serde, Serde<V> serde2, String str) {
        toStream().to(serde, serde2, null, str);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public void to(Serde<K> serde, Serde<V> serde2, StreamPartitioner<K, V> streamPartitioner, String str) {
        toStream().to(serde, serde2, streamPartitioner, str);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public KStream<K, V> toStream() {
        String newName = this.topology.newName(TOSTREAM_NAME);
        this.topology.addProcessor(newName, new KStreamMapValues(new ValueMapper<Change<V>, V>() { // from class: org.apache.kafka.streams.kstream.internals.KTableImpl.2
            @Override // org.apache.kafka.streams.kstream.ValueMapper
            public V apply(Change<V> change) {
                return change.newValue;
            }
        }), this.name);
        return new KStreamImpl(this.topology, newName, this.sourceNodes, false);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> keyValueMapper) {
        return toStream().selectKey(keyValueMapper);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1, R> KTable<K, R> join(KTable<K, V1> kTable, ValueJoiner<V, V1, R> valueJoiner) {
        Objects.requireNonNull(kTable, "other can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Set<String> ensureJoinableWith = ensureJoinableWith((AbstractStream) kTable);
        String newName = this.topology.newName(JOINTHIS_NAME);
        String newName2 = this.topology.newName(JOINOTHER_NAME);
        String newName3 = this.topology.newName(MERGE_NAME);
        KTableKTableJoin kTableKTableJoin = new KTableKTableJoin(this, (KTableImpl) kTable, valueJoiner);
        KTableKTableJoin kTableKTableJoin2 = new KTableKTableJoin((KTableImpl) kTable, this, reverseJoiner(valueJoiner));
        KTableKTableJoinMerger kTableKTableJoinMerger = new KTableKTableJoinMerger(new KTableImpl(this.topology, newName, kTableKTableJoin, this.sourceNodes, this.storeName), new KTableImpl(this.topology, newName2, kTableKTableJoin2, ((KTableImpl) kTable).sourceNodes, kTable.getStoreName()));
        this.topology.addProcessor(newName, kTableKTableJoin, this.name);
        this.topology.addProcessor(newName2, kTableKTableJoin2, ((KTableImpl) kTable).name);
        this.topology.addProcessor(newName3, kTableKTableJoinMerger, newName, newName2);
        this.topology.connectProcessorAndStateStores(newName, ((KTableImpl) kTable).valueGetterSupplier().storeNames());
        this.topology.connectProcessorAndStateStores(newName2, valueGetterSupplier().storeNames());
        return new KTableImpl(this.topology, newName3, kTableKTableJoinMerger, ensureJoinableWith, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> kTable, ValueJoiner<V, V1, R> valueJoiner) {
        Objects.requireNonNull(kTable, "other can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Set<String> ensureJoinableWith = ensureJoinableWith((AbstractStream) kTable);
        String newName = this.topology.newName(OUTERTHIS_NAME);
        String newName2 = this.topology.newName(OUTEROTHER_NAME);
        String newName3 = this.topology.newName(MERGE_NAME);
        KTableKTableOuterJoin kTableKTableOuterJoin = new KTableKTableOuterJoin(this, (KTableImpl) kTable, valueJoiner);
        KTableKTableOuterJoin kTableKTableOuterJoin2 = new KTableKTableOuterJoin((KTableImpl) kTable, this, reverseJoiner(valueJoiner));
        KTableKTableJoinMerger kTableKTableJoinMerger = new KTableKTableJoinMerger(new KTableImpl(this.topology, newName, kTableKTableOuterJoin, this.sourceNodes, this.storeName), new KTableImpl(this.topology, newName2, kTableKTableOuterJoin2, ((KTableImpl) kTable).sourceNodes, kTable.getStoreName()));
        this.topology.addProcessor(newName, kTableKTableOuterJoin, this.name);
        this.topology.addProcessor(newName2, kTableKTableOuterJoin2, ((KTableImpl) kTable).name);
        this.topology.addProcessor(newName3, kTableKTableJoinMerger, newName, newName2);
        this.topology.connectProcessorAndStateStores(newName, ((KTableImpl) kTable).valueGetterSupplier().storeNames());
        this.topology.connectProcessorAndStateStores(newName2, valueGetterSupplier().storeNames());
        return new KTableImpl(this.topology, newName3, kTableKTableJoinMerger, ensureJoinableWith, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KTable
    public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> kTable, ValueJoiner<V, V1, R> valueJoiner) {
        Objects.requireNonNull(kTable, "other can't be null");
        Objects.requireNonNull(valueJoiner, "joiner can't be null");
        Set<String> ensureJoinableWith = ensureJoinableWith((AbstractStream) kTable);
        String newName = this.topology.newName(LEFTTHIS_NAME);
        String newName2 = this.topology.newName(LEFTOTHER_NAME);
        String newName3 = this.topology.newName(MERGE_NAME);
        KTableKTableLeftJoin kTableKTableLeftJoin = new KTableKTableLeftJoin(this, (KTableImpl) kTable, valueJoiner);
        KTableKTableRightJoin kTableKTableRightJoin = new KTableKTableRightJoin((KTableImpl) kTable, this, reverseJoiner(valueJoiner));
        KTableKTableJoinMerger kTableKTableJoinMerger = new KTableKTableJoinMerger(new KTableImpl(this.topology, newName, kTableKTableLeftJoin, this.sourceNodes, this.storeName), new KTableImpl(this.topology, newName2, kTableKTableRightJoin, ((KTableImpl) kTable).sourceNodes, kTable.getStoreName()));
        this.topology.addProcessor(newName, kTableKTableLeftJoin, this.name);
        this.topology.addProcessor(newName2, kTableKTableRightJoin, ((KTableImpl) kTable).name);
        this.topology.addProcessor(newName3, kTableKTableJoinMerger, newName, newName2);
        this.topology.connectProcessorAndStateStores(newName, ((KTableImpl) kTable).valueGetterSupplier().storeNames());
        this.topology.connectProcessorAndStateStores(newName2, valueGetterSupplier().storeNames());
        return new KTableImpl(this.topology, newName3, kTableKTableJoinMerger, ensureJoinableWith, null);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> keyValueMapper, Serde<K1> serde, Serde<V1> serde2) {
        Objects.requireNonNull(keyValueMapper, "selector can't be null");
        String newName = this.topology.newName(SELECT_NAME);
        this.topology.addProcessor(newName, new KTableRepartitionMap(this, keyValueMapper), this.name);
        enableSendingOldValues();
        return new KGroupedTableImpl(this.topology, newName, this.name, serde, serde2);
    }

    @Override // org.apache.kafka.streams.kstream.KTable
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> keyValueMapper) {
        return groupBy(keyValueMapper, null, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KTableValueGetterSupplier<K, V> valueGetterSupplier() {
        return this.processorSupplier instanceof KTableSource ? new KTableSourceValueGetterSupplier(((KTableSource) this.processorSupplier).storeName) : this.processorSupplier instanceof KStreamAggProcessorSupplier ? ((KStreamAggProcessorSupplier) this.processorSupplier).view() : ((KTableProcessorSupplier) this.processorSupplier).view();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enableSendingOldValues() {
        if (this.sendOldValues) {
            return;
        }
        if (this.processorSupplier instanceof KTableSource) {
            ((KTableSource) this.processorSupplier).enableSendingOldValues();
        } else if (this.processorSupplier instanceof KStreamAggProcessorSupplier) {
            ((KStreamAggProcessorSupplier) this.processorSupplier).enableSendingOldValues();
        } else {
            ((KTableProcessorSupplier) this.processorSupplier).enableSendingOldValues();
        }
        this.sendOldValues = true;
    }

    boolean sendingOldValueEnabled() {
        return this.sendOldValues;
    }
}
