package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/InMemoryWindowStore.class */
public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) InMemoryWindowStore.class);
    private static final int SEQNUM_SIZE = 4;
    private final String name;
    private final String metricScope;
    private final long retentionPeriod;
    private final long windowSize;
    private final boolean retainDuplicates;
    private ProcessorContext context;
    private Sensor expiredRecordSensor;
    private StateStoreContext stateStoreContext;
    private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>> segmentMap = new ConcurrentSkipListMap();
    private final Set<InMemoryWindowStoreIteratorWrapper> openIterators = ConcurrentHashMap.newKeySet();
    private int seqnum = 0;
    private long observedStreamTime = -1;
    private volatile boolean open = false;
    private final Position position = Position.emptyPosition();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/InMemoryWindowStore$ClosingCallback.class */
    public interface ClosingCallback {
        void deregisterIterator(InMemoryWindowStoreIteratorWrapper inMemoryWindowStoreIteratorWrapper);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/InMemoryWindowStore$InMemoryWindowStoreIteratorWrapper.class */
    public static abstract class InMemoryWindowStoreIteratorWrapper {
        private Iterator<Map.Entry<Bytes, byte[]>> recordIterator;
        private KeyValue<Bytes, byte[]> next;
        private long currentTime;
        private final boolean allKeys;
        private final Bytes keyFrom;
        private final Bytes keyTo;
        private final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator;
        private final ClosingCallback callback;
        private final boolean retainDuplicates;
        private final boolean forward;

        InMemoryWindowStoreIteratorWrapper(Bytes bytes, Bytes bytes2, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> it, ClosingCallback closingCallback, boolean z, boolean z2) {
            this.keyFrom = bytes;
            this.keyTo = bytes2;
            this.allKeys = bytes == null && bytes2 == null;
            this.retainDuplicates = z;
            this.forward = z2;
            this.segmentIterator = it;
            this.callback = closingCallback;
            this.recordIterator = it == null ? null : setRecordIterator();
        }

        public boolean hasNext() {
            if (this.next != null) {
                return true;
            }
            if (this.recordIterator == null) {
                return false;
            }
            if (!this.recordIterator.hasNext() && !this.segmentIterator.hasNext()) {
                return false;
            }
            this.next = getNext();
            if (this.next == null) {
                return false;
            }
            if (this.allKeys || !this.retainDuplicates || isKeyWithinRange(InMemoryWindowStore.getKey(this.next.key))) {
                return true;
            }
            this.next = null;
            return hasNext();
        }

        private boolean isKeyWithinRange(Bytes bytes) {
            if (this.keyFrom == null && this.keyTo == null) {
                return true;
            }
            return this.keyFrom == null ? bytes.compareTo(InMemoryWindowStore.getKey(this.keyTo)) <= 0 : this.keyTo == null ? bytes.compareTo(InMemoryWindowStore.getKey(this.keyFrom)) >= 0 : bytes.compareTo(InMemoryWindowStore.getKey(this.keyFrom)) >= 0 && bytes.compareTo(InMemoryWindowStore.getKey(this.keyTo)) <= 0;
        }

        public void close() {
            this.next = null;
            this.recordIterator = null;
            this.callback.deregisterIterator(this);
        }

        protected KeyValue<Bytes, byte[]> getNext() {
            while (!this.recordIterator.hasNext()) {
                this.recordIterator = setRecordIterator();
                if (this.recordIterator == null) {
                    return null;
                }
            }
            Map.Entry<Bytes, byte[]> next = this.recordIterator.next();
            return new KeyValue<>(next.getKey(), next.getValue());
        }

        Iterator<Map.Entry<Bytes, byte[]>> setRecordIterator() {
            if (!this.segmentIterator.hasNext()) {
                return null;
            }
            Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> next = this.segmentIterator.next();
            this.currentTime = next.getKey().longValue();
            ConcurrentNavigableMap<Bytes, byte[]> value = this.allKeys ? next.getValue() : this.keyFrom == null ? next.getValue().headMap((ConcurrentNavigableMap<Bytes, byte[]>) this.keyTo, true) : this.keyTo == null ? next.getValue().tailMap((ConcurrentNavigableMap<Bytes, byte[]>) this.keyFrom, true) : next.getValue().subMap((boolean) this.keyFrom, true, (boolean) this.keyTo, true);
            return this.forward ? value.entrySet().iterator() : value.descendingMap().entrySet().iterator();
        }

        Long minTime() {
            return Long.valueOf(this.currentTime);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/InMemoryWindowStore$WrappedInMemoryWindowStoreIterator.class */
    public static class WrappedInMemoryWindowStoreIterator extends InMemoryWindowStoreIteratorWrapper implements WindowStoreIterator<byte[]> {
        WrappedInMemoryWindowStoreIterator(Bytes bytes, Bytes bytes2, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> it, ClosingCallback closingCallback, boolean z, boolean z2) {
            super(bytes, bytes2, it, closingCallback, z, z2);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.streams.state.KeyValueIterator
        public Long peekNextKey() {
            if (hasNext()) {
                return Long.valueOf(((InMemoryWindowStoreIteratorWrapper) this).currentTime);
            }
            throw new NoSuchElementException();
        }

        @Override // java.util.Iterator
        public KeyValue<Long, byte[]> next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Long, byte[]> keyValue = new KeyValue<>(Long.valueOf(((InMemoryWindowStoreIteratorWrapper) this).currentTime), ((InMemoryWindowStoreIteratorWrapper) this).next.value);
            ((InMemoryWindowStoreIteratorWrapper) this).next = null;
            return keyValue;
        }

        public static WrappedInMemoryWindowStoreIterator emptyIterator() {
            return new WrappedInMemoryWindowStoreIterator(null, null, null, inMemoryWindowStoreIteratorWrapper -> {
            }, false, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/InMemoryWindowStore$WrappedWindowedKeyValueIterator.class */
    public static class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper implements KeyValueIterator<Windowed<Bytes>, byte[]> {
        private final long windowSize;

        WrappedWindowedKeyValueIterator(Bytes bytes, Bytes bytes2, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> it, ClosingCallback closingCallback, boolean z, long j, boolean z2) {
            super(bytes, bytes2, it, closingCallback, z, z2);
            this.windowSize = j;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.streams.state.KeyValueIterator
        public Windowed<Bytes> peekNextKey() {
            if (hasNext()) {
                return getWindowedKey();
            }
            throw new NoSuchElementException();
        }

        @Override // java.util.Iterator
        public KeyValue<Windowed<Bytes>, byte[]> next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Windowed<Bytes>, byte[]> keyValue = new KeyValue<>(getWindowedKey(), ((InMemoryWindowStoreIteratorWrapper) this).next.value);
            ((InMemoryWindowStoreIteratorWrapper) this).next = null;
            return keyValue;
        }

        /* JADX WARN: Multi-variable type inference failed */
        private Windowed<Bytes> getWindowedKey() {
            Bytes key = ((InMemoryWindowStoreIteratorWrapper) this).retainDuplicates ? InMemoryWindowStore.getKey((Bytes) ((InMemoryWindowStoreIteratorWrapper) this).next.key) : (Bytes) ((InMemoryWindowStoreIteratorWrapper) this).next.key;
            long j = ((InMemoryWindowStoreIteratorWrapper) this).currentTime + this.windowSize;
            if (j < 0) {
                InMemoryWindowStore.LOG.warn("Warning: window end time was truncated to Long.MAX");
                j = Long.MAX_VALUE;
            }
            return new Windowed<>(key, new TimeWindow(((InMemoryWindowStoreIteratorWrapper) this).currentTime, j));
        }
    }

    public InMemoryWindowStore(String str, long j, long j2, boolean z, String str2) {
        this.name = str;
        this.retentionPeriod = j;
        this.windowSize = j2;
        this.retainDuplicates = z;
        this.metricScope = str2;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.context = processorContext;
        this.expiredRecordSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), ProcessorContextUtils.getMetricsImpl(processorContext));
        if (stateStore != null) {
            boolean z = StreamsConfig.InternalConfig.getBoolean(processorContext.appConfigs(), StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, false);
            processorContext.register(stateStore, collection -> {
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    put(Bytes.wrap(WindowKeySchema.extractStoreKeyBytes((byte[]) consumerRecord.key())), (byte[]) consumerRecord.value(), WindowKeySchema.extractStoreTimestamp((byte[]) consumerRecord.key()));
                    ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(consumerRecord, z, this.position);
                }
            });
        }
        this.open = true;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        this.stateStoreContext = stateStoreContext;
        init(StoreToProcessorContextAdapter.adapt(stateStoreContext), stateStore);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public Position getPosition() {
        return this.position;
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public void put(Bytes bytes, byte[] bArr, long j) {
        removeExpiredSegments();
        this.observedStreamTime = Math.max(this.observedStreamTime, j);
        if (j <= this.observedStreamTime - this.retentionPeriod) {
            this.expiredRecordSensor.record(1.0d, this.context.currentSystemTimeMs());
            LOG.warn("Skipping record for expired segment.");
        } else if (bArr != null) {
            maybeUpdateSeqnumForDups();
            Bytes wrapForDups = this.retainDuplicates ? wrapForDups(bytes, this.seqnum) : bytes;
            this.segmentMap.computeIfAbsent(Long.valueOf(j), l -> {
                return new ConcurrentSkipListMap();
            });
            ((ConcurrentNavigableMap) this.segmentMap.get(Long.valueOf(j))).put(wrapForDups, bArr);
        } else if (!this.retainDuplicates) {
            this.segmentMap.computeIfPresent(Long.valueOf(j), (l2, concurrentNavigableMap) -> {
                concurrentNavigableMap.remove(bytes);
                if (concurrentNavigableMap.isEmpty()) {
                    this.segmentMap.remove(Long.valueOf(j));
                }
                return concurrentNavigableMap;
            });
        }
        StoreQueryUtils.updatePosition(this.position, this.stateStoreContext);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public byte[] fetch(Bytes bytes, long j) {
        ConcurrentNavigableMap concurrentNavigableMap;
        Objects.requireNonNull(bytes, "key cannot be null");
        removeExpiredSegments();
        if (j > this.observedStreamTime - this.retentionPeriod && (concurrentNavigableMap = (ConcurrentNavigableMap) this.segmentMap.get(Long.valueOf(j))) != null) {
            return (byte[]) concurrentNavigableMap.get(bytes);
        }
        return null;
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    @Deprecated
    public WindowStoreIterator<byte[]> fetch(Bytes bytes, long j, long j2) {
        return fetch(bytes, j, j2, true);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public WindowStoreIterator<byte[]> backwardFetch(Bytes bytes, long j, long j2) {
        return fetch(bytes, j, j2, false);
    }

    WindowStoreIterator<byte[]> fetch(Bytes bytes, long j, long j2, boolean z) {
        Objects.requireNonNull(bytes, "key cannot be null");
        removeExpiredSegments();
        long max = Math.max(j, (this.observedStreamTime - this.retentionPeriod) + 1);
        return j2 < max ? WrappedInMemoryWindowStoreIterator.emptyIterator() : z ? registerNewWindowStoreIterator(bytes, this.segmentMap.subMap((boolean) Long.valueOf(max), true, (boolean) Long.valueOf(j2), true).entrySet().iterator(), true) : registerNewWindowStoreIterator(bytes, this.segmentMap.subMap((boolean) Long.valueOf(max), true, (boolean) Long.valueOf(j2), true).descendingMap().entrySet().iterator(), false);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    @Deprecated
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        return fetch(bytes, bytes2, j, j2, true);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        return fetch(bytes, bytes2, j, j2, false);
    }

    KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2, long j, long j2, boolean z) {
        removeExpiredSegments();
        if (bytes == null || bytes2 == null || bytes.compareTo(bytes2) <= 0) {
            long max = Math.max(j, (this.observedStreamTime - this.retentionPeriod) + 1);
            return j2 < max ? KeyValueIterators.emptyIterator() : z ? registerNewWindowedKeyValueIterator(bytes, bytes2, this.segmentMap.subMap((boolean) Long.valueOf(max), true, (boolean) Long.valueOf(j2), true).entrySet().iterator(), true) : registerNewWindowedKeyValueIterator(bytes, bytes2, this.segmentMap.subMap((boolean) Long.valueOf(max), true, (boolean) Long.valueOf(j2), true).descendingMap().entrySet().iterator(), false);
        }
        LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
        return KeyValueIterators.emptyIterator();
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    @Deprecated
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long j, long j2) {
        return fetchAll(j, j2, true);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(long j, long j2) {
        return fetchAll(j, j2, false);
    }

    KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long j, long j2, boolean z) {
        removeExpiredSegments();
        long max = Math.max(j, (this.observedStreamTime - this.retentionPeriod) + 1);
        return j2 < max ? KeyValueIterators.emptyIterator() : z ? registerNewWindowedKeyValueIterator(null, null, this.segmentMap.subMap((boolean) Long.valueOf(max), true, (boolean) Long.valueOf(j2), true).entrySet().iterator(), true) : registerNewWindowedKeyValueIterator(null, null, this.segmentMap.subMap((boolean) Long.valueOf(max), true, (boolean) Long.valueOf(j2), true).descendingMap().entrySet().iterator(), false);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        removeExpiredSegments();
        return registerNewWindowedKeyValueIterator(null, null, this.segmentMap.tailMap((ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>>) Long.valueOf(this.observedStreamTime - this.retentionPeriod), false).entrySet().iterator(), true);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
        removeExpiredSegments();
        return registerNewWindowedKeyValueIterator(null, null, this.segmentMap.tailMap((ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>>) Long.valueOf(this.observedStreamTime - this.retentionPeriod), false).descendingMap().entrySet().iterator(), false);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return false;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.open;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, QueryConfig queryConfig) {
        return StoreQueryUtils.handleBasicQueries(query, positionBound, queryConfig, this, this.position, this.stateStoreContext);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void flush() {
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void close() {
        if (this.openIterators.size() != 0) {
            LOG.warn("Closing {} open iterators for store {}", Integer.valueOf(this.openIterators.size()), this.name);
            Iterator<InMemoryWindowStoreIteratorWrapper> it = this.openIterators.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }
        this.segmentMap.clear();
        this.open = false;
    }

    private void removeExpiredSegments() {
        long max = Math.max(0L, (this.observedStreamTime - this.retentionPeriod) + 1);
        Iterator<InMemoryWindowStoreIteratorWrapper> it = this.openIterators.iterator();
        while (it.hasNext()) {
            max = Math.min(max, it.next().minTime().longValue());
        }
        this.segmentMap.headMap((ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>>) Long.valueOf(max), false).clear();
    }

    private void maybeUpdateSeqnumForDups() {
        if (this.retainDuplicates) {
            this.seqnum = (this.seqnum + 1) & Integer.MAX_VALUE;
        }
    }

    private static Bytes wrapForDups(Bytes bytes, int i) {
        ByteBuffer allocate = ByteBuffer.allocate(bytes.get().length + 4);
        allocate.put(bytes.get());
        allocate.putInt(i);
        return Bytes.wrap(allocate.array());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Bytes getKey(Bytes bytes) {
        byte[] bArr = new byte[bytes.get().length - 4];
        System.arraycopy(bytes.get(), 0, bArr, 0, bArr.length);
        return Bytes.wrap(bArr);
    }

    private WrappedInMemoryWindowStoreIterator registerNewWindowStoreIterator(Bytes bytes, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> it, boolean z) {
        Bytes wrapForDups = this.retainDuplicates ? wrapForDups(bytes, 0) : bytes;
        Bytes wrapForDups2 = this.retainDuplicates ? wrapForDups(bytes, Integer.MAX_VALUE) : bytes;
        Set<InMemoryWindowStoreIteratorWrapper> set = this.openIterators;
        set.getClass();
        WrappedInMemoryWindowStoreIterator wrappedInMemoryWindowStoreIterator = new WrappedInMemoryWindowStoreIterator(wrapForDups, wrapForDups2, it, (v1) -> {
            r5.remove(v1);
        }, this.retainDuplicates, z);
        this.openIterators.add(wrappedInMemoryWindowStoreIterator);
        return wrappedInMemoryWindowStoreIterator;
    }

    private WrappedWindowedKeyValueIterator registerNewWindowedKeyValueIterator(Bytes bytes, Bytes bytes2, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> it, boolean z) {
        Bytes wrapForDups = (!this.retainDuplicates || bytes == null) ? bytes : wrapForDups(bytes, 0);
        Bytes wrapForDups2 = (!this.retainDuplicates || bytes2 == null) ? bytes2 : wrapForDups(bytes2, Integer.MAX_VALUE);
        Set<InMemoryWindowStoreIteratorWrapper> set = this.openIterators;
        set.getClass();
        WrappedWindowedKeyValueIterator wrappedWindowedKeyValueIterator = new WrappedWindowedKeyValueIterator(wrapForDups, wrapForDups2, it, (v1) -> {
            r5.remove(v1);
        }, this.retainDuplicates, this.windowSize, z);
        this.openIterators.add(wrappedWindowedKeyValueIterator);
        return wrappedWindowedKeyValueIterator;
    }
}
