package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableAggregate.class */
public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
    private final String storeName;
    private final Initializer<T> initializer;
    private final Aggregator<? super K, ? super V, T> add;
    private final Aggregator<? super K, ? super V, T> remove;
    private boolean sendOldValues = false;

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableAggregate$KTableAggregateProcessor.class */
    private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
        private KeyValueStore<K, T> store;
        private TupleForwarder<K, T> tupleForwarder;

        private KTableAggregateProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.store = (KeyValueStore) processorContext.getStateStore(KTableAggregate.this.storeName);
            this.tupleForwarder = new TupleForwarder<>(this.store, processorContext, new ForwardingCacheFlushListener(processorContext, KTableAggregate.this.sendOldValues), KTableAggregate.this.sendOldValues);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void process(K k, Change<V> change) {
            T t;
            if (k == null) {
                throw new StreamsException("Record key for KTable aggregate operator with state " + KTableAggregate.this.storeName + " should not be null.");
            }
            T t2 = this.store.get(k);
            T apply = (change.oldValue == null || t2 == null) ? t2 : KTableAggregate.this.remove.apply(k, change.oldValue, t2);
            if (change.newValue != null) {
                t = KTableAggregate.this.add.apply(k, change.newValue, apply == null ? KTableAggregate.this.initializer.apply() : apply);
            } else {
                t = apply;
            }
            this.store.put(k, t);
            this.tupleForwarder.maybeForward(k, t, t2);
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
            process((KTableAggregateProcessor) obj, (Change) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KTableAggregate(String str, Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Aggregator<? super K, ? super V, T> aggregator2) {
        this.storeName = str;
        this.initializer = initializer;
        this.add = aggregator;
        this.remove = aggregator2;
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public void enableSendingOldValues() {
        this.sendOldValues = true;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<K, Change<V>> get() {
        return new KTableAggregateProcessor();
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public KTableValueGetterSupplier<K, T> view() {
        return new KTableMaterializedValueGetterSupplier(this.storeName);
    }
}
