package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.class */
public class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
    private final String windowName;
    private final long joinThisBeforeMs;
    private final long joinThisAfterMs;
    private final long joinOtherBeforeMs;
    private final long joinOtherAfterMs;
    private final long retentionPeriod;
    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin$KStreamKStreamSelfJoinProcessor.class */
    public class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
        private final KStreamImplJoin.TimeTracker timeTracker;
        private WindowStore<K, V2> windowStore;
        private Sensor droppedRecordsSensor;

        private KStreamKStreamSelfJoinProcessor() {
            this.timeTracker = new KStreamImplJoin.TimeTracker();
        }

        @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
        public void init(ProcessorContext<K, VOut> processorContext) {
            super.init(processorContext);
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), (StreamsMetricsImpl) processorContext.metrics());
            this.windowStore = (WindowStore) processorContext.getStateStore(KStreamKStreamSelfJoin.this.windowName);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.processor.api.Processor
        public void process(Record<K, V1> record) {
            if (StreamStreamJoinUtil.skipRecord(record, KStreamKStreamSelfJoin.LOG, this.droppedRecordsSensor, context())) {
                return;
            }
            long timestamp = record.timestamp();
            long max = Math.max(0L, timestamp - KStreamKStreamSelfJoin.this.joinThisBeforeMs);
            long max2 = Math.max(0L, timestamp + KStreamKStreamSelfJoin.this.joinThisAfterMs);
            boolean z = false;
            Record withTimestamp = record.withValue(KStreamKStreamSelfJoin.this.joinerThis.apply(record.key(), record.value(), record.value())).withTimestamp(timestamp);
            this.timeTracker.advanceStreamTime(timestamp);
            boolean z2 = timestamp > (this.timeTracker.streamTime - KStreamKStreamSelfJoin.this.retentionPeriod) + 1;
            WindowStoreIterator<V2> fetch = this.windowStore.fetch((WindowStore<K, V2>) record.key(), max, max2);
            Throwable th = null;
            while (fetch.hasNext()) {
                try {
                    try {
                        KeyValue keyValue = (KeyValue) fetch.next();
                        context().forward(record.withValue(KStreamKStreamSelfJoin.this.joinerThis.apply(record.key(), record.value(), keyValue.value)).withTimestamp(Math.max(timestamp, ((Long) keyValue.key).longValue())));
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } finally {
                }
            }
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    fetch.close();
                }
            }
            WindowStoreIterator<V2> fetch2 = this.windowStore.fetch((WindowStore<K, V2>) record.key(), Math.max(0L, timestamp - KStreamKStreamSelfJoin.this.joinOtherBeforeMs), Math.max(0L, timestamp + KStreamKStreamSelfJoin.this.joinOtherAfterMs));
            Throwable th4 = null;
            while (fetch2.hasNext()) {
                try {
                    try {
                        KeyValue keyValue2 = (KeyValue) fetch2.next();
                        long longValue = ((Long) keyValue2.key).longValue();
                        if (timestamp < Math.max(timestamp, longValue) && !z && z2) {
                            z = true;
                            context().forward(withTimestamp);
                        }
                        context().forward(record.withValue(KStreamKStreamSelfJoin.this.joinerThis.apply(record.key(), keyValue2.value, record.value())).withTimestamp(Math.max(timestamp, longValue)));
                    } catch (Throwable th5) {
                        th4 = th5;
                        throw th5;
                    }
                } finally {
                }
            }
            if (fetch2 != null) {
                if (0 != 0) {
                    try {
                        fetch2.close();
                    } catch (Throwable th6) {
                        th4.addSuppressed(th6);
                    }
                } else {
                    fetch2.close();
                }
            }
            if (z || !z2) {
                return;
            }
            context().forward(withTimestamp);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KStreamKStreamSelfJoin(String str, JoinWindowsInternal joinWindowsInternal, ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> valueJoinerWithKey, long j) {
        this.windowName = str;
        this.joinThisBeforeMs = joinWindowsInternal.beforeMs;
        this.joinThisAfterMs = joinWindowsInternal.afterMs;
        this.joinOtherBeforeMs = joinWindowsInternal.afterMs;
        this.joinOtherAfterMs = joinWindowsInternal.beforeMs;
        this.joinerThis = valueJoinerWithKey;
        this.retentionPeriod = j;
    }

    @Override // org.apache.kafka.streams.processor.api.ProcessorSupplier, java.util.function.Supplier
    public Processor<K, V1, K, VOut> get() {
        return new KStreamKStreamSelfJoinProcessor();
    }
}
