package org.apache.kafka.streams.kstream.internals;

import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.class */
public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> {
    private final String storeName;
    private final SlidingWindows windows;
    private final Initializer<VAgg> initializer;
    private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private boolean sendOldValues = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate$KStreamSlidingWindowAggregateProcessor.class */
    public class KStreamSlidingWindowAggregateProcessor extends ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
        private TimestampedWindowStore<KIn, VAgg> windowStore;
        private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
        private Sensor droppedRecordsSensor;
        private long observedStreamTime;
        private Boolean reverseIteratorPossible;

        private KStreamSlidingWindowAggregateProcessor() {
            this.observedStreamTime = -1L;
            this.reverseIteratorPossible = null;
        }

        @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
        public void init(ProcessorContext<Windowed<KIn>, Change<VAgg>> processorContext) {
            super.init(processorContext);
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), ((InternalProcessorContext) processorContext).metrics());
            this.windowStore = (TimestampedWindowStore) processorContext.getStateStore(KStreamSlidingWindowAggregate.this.storeName);
            this.tupleForwarder = new TimestampedTupleForwarder<>(this.windowStore, processorContext, new TimestampedCacheFlushListener(processorContext), KStreamSlidingWindowAggregate.this.sendOldValues);
        }

        @Override // org.apache.kafka.streams.processor.api.Processor
        public void process(Record<KIn, VIn> record) {
            if (record.key() == null || record.value() == null) {
                if (context().recordMetadata().isPresent()) {
                    RecordMetadata recordMetadata = context().recordMetadata().get();
                    KStreamSlidingWindowAggregate.this.log.warn("Skipping record due to null key or value. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
                } else {
                    KStreamSlidingWindowAggregate.this.log.warn("Skipping record due to null key or value. Topic, partition, and offset not known.");
                }
                this.droppedRecordsSensor.record();
                return;
            }
            long timestamp = record.timestamp();
            this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
            long gracePeriodMs = this.observedStreamTime - KStreamSlidingWindowAggregate.this.windows.gracePeriodMs();
            if (timestamp + 1 + KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs() <= gracePeriodMs) {
                if (context().recordMetadata().isPresent()) {
                    RecordMetadata recordMetadata2 = context().recordMetadata().get();
                    KStreamSlidingWindowAggregate.this.log.warn("Skipping record for expired window. topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}] expiration=[{}] streamTime=[{}]", new Object[]{recordMetadata2.topic(), Integer.valueOf(recordMetadata2.partition()), Long.valueOf(recordMetadata2.offset()), Long.valueOf(record.timestamp()), Long.valueOf(timestamp - KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs()), Long.valueOf(timestamp), Long.valueOf(gracePeriodMs), Long.valueOf(this.observedStreamTime)});
                } else {
                    KStreamSlidingWindowAggregate.this.log.warn("Skipping record for expired window. Topic, partition, and offset not known. timestamp=[{}] window=[{},{}] expiration=[{}] streamTime=[{}]", new Object[]{Long.valueOf(record.timestamp()), Long.valueOf(timestamp - KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs()), Long.valueOf(timestamp), Long.valueOf(gracePeriodMs), Long.valueOf(this.observedStreamTime)});
                }
                this.droppedRecordsSensor.record();
                return;
            }
            if (timestamp < KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs()) {
                processEarly(record.key(), record.value(), timestamp, gracePeriodMs);
                return;
            }
            if (this.reverseIteratorPossible == null) {
                try {
                    this.windowStore.backwardFetch((TimestampedWindowStore<KIn, VAgg>) record.key(), 0L, 0L);
                    this.reverseIteratorPossible = true;
                    KStreamSlidingWindowAggregate.this.log.debug("Sliding Windows aggregate using a reverse iterator");
                } catch (UnsupportedOperationException e) {
                    this.reverseIteratorPossible = false;
                    KStreamSlidingWindowAggregate.this.log.debug("Sliding Windows aggregate using a forward iterator");
                }
            }
            if (this.reverseIteratorPossible.booleanValue()) {
                processReverse(record.key(), record.value(), timestamp, gracePeriodMs);
            } else {
                processInOrder(record.key(), record.value(), timestamp, gracePeriodMs);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void processInOrder(KIn kin, VIn vin, long j, long j2) {
            HashSet hashSet = new HashSet();
            ValueAndTimestamp<VAgg> valueAndTimestamp = null;
            ValueAndTimestamp<VAgg> valueAndTimestamp2 = null;
            boolean z = false;
            boolean z2 = false;
            Long l = null;
            KeyValueIterator<Windowed<KIn>, VAgg> fetch = this.windowStore.fetch(kin, kin, Math.max(0L, j - (2 * KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs())), j + 1);
            Throwable th = null;
            while (fetch.hasNext()) {
                try {
                    try {
                        KeyValue next = fetch.next();
                        long start = ((Windowed) next.key).window().start();
                        hashSet.add(Long.valueOf(start));
                        long timeDifferenceMs = start + KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs();
                        long timestamp = ((ValueAndTimestamp) next.value).timestamp();
                        if (timeDifferenceMs < j) {
                            valueAndTimestamp = (ValueAndTimestamp) next.value;
                            l = Long.valueOf(timestamp);
                        } else if (timeDifferenceMs == j) {
                            z = true;
                            if (timestamp < j) {
                                l = Long.valueOf(timestamp);
                            }
                            updateWindowAndForward(((Windowed) next.key).window(), (ValueAndTimestamp) next.value, kin, vin, j2, j);
                        } else if (timeDifferenceMs > j && start <= j) {
                            valueAndTimestamp2 = (ValueAndTimestamp) next.value;
                            updateWindowAndForward(((Windowed) next.key).window(), (ValueAndTimestamp) next.value, kin, vin, j2, j);
                        } else {
                            if (start != j + 1) {
                                KStreamSlidingWindowAggregate.this.log.error("Unexpected window with start {} found when processing record at {} in `KStreamSlidingWindowAggregate`.", Long.valueOf(start), Long.valueOf(j));
                                throw new IllegalStateException("Unexpected window found when processing sliding windows");
                            }
                            z2 = true;
                        }
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (fetch != null) {
                        if (th != null) {
                            try {
                                fetch.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    throw th2;
                }
            }
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            createWindows(kin, vin, j, j2, hashSet, valueAndTimestamp2, valueAndTimestamp, z, z2, l);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void processReverse(KIn kin, VIn vin, long j, long j2) {
            HashSet hashSet = new HashSet();
            ValueAndTimestamp<VAgg> valueAndTimestamp = null;
            ValueAndTimestamp<VAgg> valueAndTimestamp2 = null;
            boolean z = false;
            boolean z2 = false;
            Long l = null;
            KeyValueIterator<Windowed<KIn>, VAgg> backwardFetch = this.windowStore.backwardFetch(kin, kin, Math.max(0L, j - (2 * KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs())), j + 1);
            Throwable th = null;
            while (true) {
                try {
                    if (!backwardFetch.hasNext()) {
                        break;
                    }
                    KeyValue next = backwardFetch.next();
                    long start = ((Windowed) next.key).window().start();
                    hashSet.add(Long.valueOf(start));
                    long timeDifferenceMs = start + KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs();
                    long timestamp = ((ValueAndTimestamp) next.value).timestamp();
                    if (start == j + 1) {
                        z2 = true;
                    } else if (timeDifferenceMs > j) {
                        if (valueAndTimestamp2 == null) {
                            valueAndTimestamp2 = (ValueAndTimestamp) next.value;
                        }
                        updateWindowAndForward(((Windowed) next.key).window(), (ValueAndTimestamp) next.value, kin, vin, j2, j);
                    } else if (timeDifferenceMs == j) {
                        z = true;
                        updateWindowAndForward(((Windowed) next.key).window(), (ValueAndTimestamp) next.value, kin, vin, j2, j);
                        if (timestamp >= j) {
                            if (backwardFetch != null) {
                                if (0 == 0) {
                                    backwardFetch.close();
                                    return;
                                }
                                try {
                                    backwardFetch.close();
                                    return;
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                    return;
                                }
                            }
                            return;
                        }
                        l = Long.valueOf(timestamp);
                    } else {
                        if (timeDifferenceMs >= j) {
                            KStreamSlidingWindowAggregate.this.log.error("Unexpected window with start {} found when processing record at {} in `KStreamSlidingWindowAggregate`.", Long.valueOf(start), Long.valueOf(j));
                            throw new IllegalStateException("Unexpected window found when processing sliding windows");
                        }
                        valueAndTimestamp = (ValueAndTimestamp) next.value;
                        l = Long.valueOf(timestamp);
                    }
                } finally {
                    if (backwardFetch != null) {
                        if (0 != 0) {
                            try {
                                backwardFetch.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            backwardFetch.close();
                        }
                    }
                }
            }
            createWindows(kin, vin, j, j2, hashSet, valueAndTimestamp2, valueAndTimestamp, z, z2, l);
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void processEarly(KIn kin, VIn vin, long j, long j2) {
            if (j < 0 || j >= KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs()) {
                KStreamSlidingWindowAggregate.this.log.error("Early record for sliding windows must fall between fall between 0 <= inputRecordTimestamp. Timestamp {} does not fall between 0 <= {}", Long.valueOf(j), Long.valueOf(KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs()));
                throw new IllegalArgumentException("Early record for sliding windows must fall between fall between 0 <= inputRecordTimestamp");
            }
            KeyValue keyValue = null;
            ValueAndTimestamp<VAgg> valueAndTimestamp = null;
            boolean z = false;
            HashSet hashSet = new HashSet();
            Long l = null;
            KeyValueIterator<Windowed<KIn>, VAgg> fetch = this.windowStore.fetch(kin, kin, 0L, j + 1);
            Throwable th = null;
            while (fetch.hasNext()) {
                try {
                    try {
                        KeyValue next = fetch.next();
                        long start = ((Windowed) next.key).window().start();
                        hashSet.add(Long.valueOf(start));
                        long timestamp = ((ValueAndTimestamp) next.value).timestamp();
                        if (start == 0) {
                            keyValue = next;
                            if (timestamp < j) {
                                l = Long.valueOf(timestamp);
                            }
                        } else if (start <= j) {
                            valueAndTimestamp = (ValueAndTimestamp) next.value;
                            updateWindowAndForward(((Windowed) next.key).window(), (ValueAndTimestamp) next.value, kin, vin, j2, j);
                        } else {
                            if (start != j + 1) {
                                KStreamSlidingWindowAggregate.this.log.error("Unexpected window with start {} found when processing record at {} in `KStreamSlidingWindowAggregate`.", Long.valueOf(start), Long.valueOf(j));
                                throw new IllegalStateException("Unexpected window found when processing sliding windows");
                            }
                            z = true;
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (fetch != null) {
                        if (th != null) {
                            try {
                                fetch.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    throw th3;
                }
            }
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            if (valueAndTimestamp == null && keyValue != null && ((ValueAndTimestamp) keyValue.value).timestamp() > j) {
                valueAndTimestamp = (ValueAndTimestamp) keyValue.value;
            }
            if (!z && rightWindowIsNotEmpty(valueAndTimestamp, j)) {
                createCurrentRecordRightWindow(j, valueAndTimestamp, kin);
            }
            if (l != null && !hashSet.contains(Long.valueOf(l.longValue() + 1))) {
                createPreviousRecordRightWindow(l.longValue() + 1, j, kin, vin, j2);
            }
            if (keyValue == null) {
                updateWindowAndForward(new TimeWindow(0L, KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs()), ValueAndTimestamp.make(KStreamSlidingWindowAggregate.this.initializer.apply(), j), kin, vin, j2, j);
            } else {
                updateWindowAndForward(((Windowed) keyValue.key).window(), (ValueAndTimestamp) keyValue.value, kin, vin, j2, j);
            }
        }

        private void createWindows(KIn kin, VIn vin, long j, long j2, Set<Long> set, ValueAndTimestamp<VAgg> valueAndTimestamp, ValueAndTimestamp<VAgg> valueAndTimestamp2, boolean z, boolean z2, Long l) {
            if (l != null) {
                long longValue = l.longValue() + 1;
                if (previousRecordRightWindowDoesNotExistAndIsNotEmpty(set, longValue, j)) {
                    createPreviousRecordRightWindow(longValue, j, kin, vin, j2);
                }
            }
            if (!z) {
                updateWindowAndForward(new TimeWindow(j - KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs(), j), leftWindowNotEmpty(l, j) ? ValueAndTimestamp.make(valueAndTimestamp2.value(), j) : ValueAndTimestamp.make(KStreamSlidingWindowAggregate.this.initializer.apply(), j), kin, vin, j2, j);
            }
            if (z2 || !rightWindowIsNotEmpty(valueAndTimestamp, j)) {
                return;
            }
            createCurrentRecordRightWindow(j, valueAndTimestamp, kin);
        }

        private void createCurrentRecordRightWindow(long j, ValueAndTimestamp<VAgg> valueAndTimestamp, KIn kin) {
            TimeWindow timeWindow = new TimeWindow(j + 1, j + 1 + KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs());
            this.windowStore.put(kin, valueAndTimestamp, timeWindow.start());
            this.tupleForwarder.maybeForward(new Windowed<>(kin, timeWindow), valueAndTimestamp.value(), null, valueAndTimestamp.timestamp());
        }

        private void createPreviousRecordRightWindow(long j, long j2, KIn kin, VIn vin, long j3) {
            updateWindowAndForward(new TimeWindow(j, j + KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs()), ValueAndTimestamp.make(KStreamSlidingWindowAggregate.this.initializer.apply(), j2), kin, vin, j3, j2);
        }

        private boolean leftWindowNotEmpty(Long l, long j) {
            return l != null && j - KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs() <= l.longValue();
        }

        private boolean previousRecordRightWindowDoesNotExistAndIsNotEmpty(Set<Long> set, long j, long j2) {
            return !set.contains(Long.valueOf(j)) && j + KStreamSlidingWindowAggregate.this.windows.timeDifferenceMs() >= j2;
        }

        private boolean rightWindowIsNotEmpty(ValueAndTimestamp<VAgg> valueAndTimestamp, long j) {
            return valueAndTimestamp != null && valueAndTimestamp.timestamp() > j;
        }

        private void updateWindowAndForward(Window window, ValueAndTimestamp<VAgg> valueAndTimestamp, KIn kin, VIn vin, long j, long j2) {
            long start = window.start();
            long end = window.end();
            if (end > j) {
                Object valueOrNull = ValueAndTimestamp.getValueOrNull(valueAndTimestamp);
                Object apply = KStreamSlidingWindowAggregate.this.aggregator.apply(kin, vin, valueOrNull);
                long max = valueOrNull == null ? j2 : Math.max(j2, valueAndTimestamp.timestamp());
                this.windowStore.put(kin, ValueAndTimestamp.make(apply, max), start);
                this.tupleForwarder.maybeForward(new Windowed(kin, window), apply, KStreamSlidingWindowAggregate.this.sendOldValues ? valueOrNull : null, max);
                return;
            }
            if (context().recordMetadata().isPresent()) {
                RecordMetadata recordMetadata = context().recordMetadata().get();
                KStreamSlidingWindowAggregate.this.log.warn("Skipping record for expired window. topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}] expiration=[{}] streamTime=[{}]", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()), Long.valueOf(j2), Long.valueOf(start), Long.valueOf(end), Long.valueOf(j), Long.valueOf(this.observedStreamTime)});
            } else {
                KStreamSlidingWindowAggregate.this.log.warn("Skipping record for expired window. Topic, partition, and offset not known. timestamp=[{}] window=[{},{}] expiration=[{}] streamTime=[{}]", new Object[]{Long.valueOf(j2), Long.valueOf(start), Long.valueOf(end), Long.valueOf(j), Long.valueOf(this.observedStreamTime)});
            }
            this.droppedRecordsSensor.record();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate$KStreamWindowAggregateValueGetter.class */
    private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<KIn>, VAgg> {
        private TimestampedWindowStore<KIn, VAgg> windowStore;

        private KStreamWindowAggregateValueGetter() {
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
        public void init(org.apache.kafka.streams.processor.ProcessorContext processorContext) {
            this.windowStore = (TimestampedWindowStore) processorContext.getStateStore(KStreamSlidingWindowAggregate.this.storeName);
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
        public ValueAndTimestamp<VAgg> get(Windowed<KIn> windowed) {
            return (ValueAndTimestamp) this.windowStore.fetch(windowed.key(), windowed.window().start());
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
        public void close() {
        }
    }

    public KStreamSlidingWindowAggregate(SlidingWindows slidingWindows, String str, Initializer<VAgg> initializer, Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
        this.windows = slidingWindows;
        this.storeName = str;
        this.initializer = initializer;
        this.aggregator = aggregator;
    }

    @Override // org.apache.kafka.streams.processor.api.ProcessorSupplier, java.util.function.Supplier
    public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
        return new KStreamSlidingWindowAggregateProcessor();
    }

    public SlidingWindows windows() {
        return this.windows;
    }

    @Override // org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier
    public void enableSendingOldValues() {
        this.sendOldValues = true;
    }

    @Override // org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier
    public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() {
        return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate.1
            @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
            public KTableValueGetter<Windowed<KIn>, VAgg> get() {
                return new KStreamWindowAggregateValueGetter();
            }

            @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
            public String[] storeNames() {
                return new String[]{KStreamSlidingWindowAggregate.this.storeName};
            }
        };
    }
}
