package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.HashSet;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.class */
public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
    private final KTableProcessorSupplier<K, ?, V> parent1;
    private final KTableProcessorSupplier<K, ?, V> parent2;
    private final String queryableName;
    private boolean sendOldValues = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.class */
    public class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>> {
        private TimestampedKeyValueStore<K, V> store;
        private TimestampedTupleForwarder<K, V> tupleForwarder;

        private KTableKTableJoinMergeProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            if (KTableKTableJoinMerger.this.queryableName != null) {
                this.store = (TimestampedKeyValueStore) processorContext.getStateStore(KTableKTableJoinMerger.this.queryableName);
                this.tupleForwarder = new TimestampedTupleForwarder<>(this.store, processorContext, new TimestampedCacheFlushListener(processorContext), KTableKTableJoinMerger.this.sendOldValues);
            }
        }

        public void process(K k, Change<V> change) {
            if (KTableKTableJoinMerger.this.queryableName != null) {
                this.store.put(k, ValueAndTimestamp.make(change.newValue, context().timestamp()));
                this.tupleForwarder.maybeForward(k, change.newValue, KTableKTableJoinMerger.this.sendOldValues ? change.oldValue : null);
            } else if (KTableKTableJoinMerger.this.sendOldValues) {
                context().forward(k, change);
            } else {
                context().forward(k, new Change(change.newValue, null));
            }
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
            process((KTableKTableJoinMergeProcessor) obj, (Change) obj2);
        }
    }

    KTableKTableJoinMerger(KTableProcessorSupplier<K, ?, V> kTableProcessorSupplier, KTableProcessorSupplier<K, ?, V> kTableProcessorSupplier2, String str) {
        this.parent1 = kTableProcessorSupplier;
        this.parent2 = kTableProcessorSupplier2;
        this.queryableName = str;
    }

    public String getQueryableName() {
        return this.queryableName;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier, java.util.function.Supplier
    public Processor<K, Change<V>> get() {
        return new KTableKTableJoinMergeProcessor();
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public KTableValueGetterSupplier<K, V> view() {
        return this.queryableName != null ? new KTableMaterializedValueGetterSupplier(this.queryableName) : new KTableValueGetterSupplier<K, V>() { // from class: org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger.1
            @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
            public KTableValueGetter<K, V> get() {
                return KTableKTableJoinMerger.this.parent1.view().get();
            }

            @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
            public String[] storeNames() {
                String[] storeNames = KTableKTableJoinMerger.this.parent1.view().storeNames();
                String[] storeNames2 = KTableKTableJoinMerger.this.parent2.view().storeNames();
                HashSet hashSet = new HashSet(storeNames.length + storeNames2.length);
                Collections.addAll(hashSet, storeNames);
                Collections.addAll(hashSet, storeNames2);
                return (String[]) hashSet.toArray(new String[0]);
            }
        };
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public boolean enableSendingOldValues(boolean z) {
        this.parent1.enableSendingOldValues(true);
        this.parent2.enableSendingOldValues(true);
        this.sendOldValues = true;
        return true;
    }

    public static <K, V> KTableKTableJoinMerger<K, V> of(KTableProcessorSupplier<K, ?, V> kTableProcessorSupplier, KTableProcessorSupplier<K, ?, V> kTableProcessorSupplier2) {
        return of(kTableProcessorSupplier, kTableProcessorSupplier2, null);
    }

    public static <K, V> KTableKTableJoinMerger<K, V> of(KTableProcessorSupplier<K, ?, V> kTableProcessorSupplier, KTableProcessorSupplier<K, ?, V> kTableProcessorSupplier2, String str) {
        return new KTableKTableJoinMerger<>(kTableProcessorSupplier, kTableProcessorSupplier2, str);
    }
}
