package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingWindowStore.class */
public class CachingWindowStore<K, V> extends WrappedStateStore<WindowStore<Bytes, byte[]>> implements WindowStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, V> {
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final long windowSize;
    private final SegmentedBytesStore.KeySchema keySchema;
    private String name;
    private ThreadCache cache;
    private boolean sendOldValues;
    private StateSerdes<K, V> serdes;
    private InternalProcessorContext context;
    private StateSerdes<Bytes, byte[]> bytesSerdes;
    private CacheFlushListener<Windowed<K>, V> flushListener;
    private final SegmentedCacheFunction cacheFunction;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingWindowStore(WindowStore<Bytes, byte[]> windowStore, Serde<K> serde, Serde<V> serde2, long j, long j2) {
        super(windowStore);
        this.keySchema = new WindowKeySchema();
        this.keySerde = serde;
        this.valueSerde = serde2;
        this.windowSize = j;
        this.cacheFunction = new SegmentedCacheFunction(this.keySchema, j2);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        initInternal((InternalProcessorContext) processorContext);
        super.init(processorContext, stateStore);
    }

    private void initInternal(InternalProcessorContext internalProcessorContext) {
        this.context = internalProcessorContext;
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic(internalProcessorContext.applicationId(), name());
        this.serdes = new StateSerdes<>(storeChangelogTopic, this.keySerde == null ? internalProcessorContext.keySerde() : this.keySerde, this.valueSerde == null ? internalProcessorContext.valueSerde() : this.valueSerde);
        this.bytesSerdes = new StateSerdes<>(storeChangelogTopic, Serdes.Bytes(), Serdes.ByteArray());
        this.name = internalProcessorContext.taskId() + "-" + name();
        this.cache = this.context.getCache();
        this.cache.addDirtyEntryFlushListener(this.name, list -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                putAndMaybeForward((ThreadCache.DirtyEntry) it.next(), internalProcessorContext);
            }
        });
    }

    private void putAndMaybeForward(ThreadCache.DirtyEntry dirtyEntry, InternalProcessorContext internalProcessorContext) {
        Windowed<Bytes> fromStoreBytesKey = WindowKeySchema.fromStoreBytesKey(this.cacheFunction.key(dirtyEntry.key()).get(), this.windowSize);
        long start = fromStoreBytesKey.window().start();
        Bytes key = fromStoreBytesKey.key();
        if (this.flushListener == null) {
            wrapped().put(key, dirtyEntry.newValue(), start);
            return;
        }
        byte[] newValue = dirtyEntry.newValue();
        byte[] fetch = (newValue == null || this.sendOldValues) ? wrapped().fetch(key, start) : null;
        if (newValue == null && fetch == null) {
            return;
        }
        Windowed fromStoreKey = WindowKeySchema.fromStoreKey(fromStoreBytesKey, this.serdes.keyDeserializer(), this.serdes.topic());
        V valueFrom = newValue != null ? this.serdes.valueFrom(newValue) : null;
        V valueFrom2 = (!this.sendOldValues || fetch == null) ? null : this.serdes.valueFrom(fetch);
        wrapped().put(key, dirtyEntry.newValue(), start);
        ProcessorRecordContext recordContext = internalProcessorContext.recordContext();
        internalProcessorContext.setRecordContext(dirtyEntry.entry().context());
        try {
            this.flushListener.apply(fromStoreKey, valueFrom, valueFrom2, dirtyEntry.entry().context().timestamp());
            internalProcessorContext.setRecordContext(recordContext);
        } catch (Throwable th) {
            internalProcessorContext.setRecordContext(recordContext);
            throw th;
        }
    }

    @Override // org.apache.kafka.streams.state.internals.CachedStateStore
    public void setFlushListener(CacheFlushListener<Windowed<K>, V> cacheFlushListener, boolean z) {
        this.flushListener = cacheFlushListener;
        this.sendOldValues = z;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public synchronized void flush() {
        this.cache.flush(this.name);
        wrapped().flush();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        flush();
        this.cache.close(this.name);
        wrapped().close();
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public synchronized void put(Bytes bytes, byte[] bArr) {
        put(bytes, bArr, this.context.timestamp());
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public synchronized void put(Bytes bytes, byte[] bArr, long j) {
        validateStoreOpen();
        Bytes storeKeyBinary = WindowKeySchema.toStoreKeyBinary(bytes, j, 0);
        this.cache.put(this.name, this.cacheFunction.cacheKey(storeKeyBinary), new LRUCacheEntry(bArr, this.context.headers(), true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic()));
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public byte[] fetch(Bytes bytes, long j) {
        LRUCacheEntry lRUCacheEntry;
        validateStoreOpen();
        Bytes cacheKey = this.cacheFunction.cacheKey(WindowKeySchema.toStoreKeyBinary(bytes, j, 0));
        if (this.cache != null && (lRUCacheEntry = this.cache.get(this.name, cacheKey)) != null) {
            return lRUCacheEntry.value();
        }
        return wrapped().fetch(bytes, j);
    }

    @Override // org.apache.kafka.streams.state.WindowStore, org.apache.kafka.streams.state.ReadOnlyWindowStore
    public synchronized WindowStoreIterator<byte[]> fetch(Bytes bytes, long j, long j2) {
        validateStoreOpen();
        WindowStoreIterator<byte[]> fetch = wrapped().fetch((WindowStore<Bytes, byte[]>) bytes, j, j2);
        if (this.cache == null) {
            return fetch;
        }
        return new MergedSortedCacheWindowStoreIterator(new FilteredCacheIterator(this.cache.range(this.name, this.cacheFunction.cacheKey(this.keySchema.lowerRangeFixedSize(bytes, j)), this.cacheFunction.cacheKey(this.keySchema.upperRangeFixedSize(bytes, j2))), this.keySchema.hasNextCondition(bytes, bytes, j, j2), this.cacheFunction), fetch);
    }

    @Override // org.apache.kafka.streams.state.WindowStore, org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> fetch = wrapped().fetch(bytes, bytes2, j, j2);
        if (this.cache == null) {
            return fetch;
        }
        return new MergedSortedCacheWindowStoreKeyValueIterator(new FilteredCacheIterator(this.cache.range(this.name, this.cacheFunction.cacheKey(this.keySchema.lowerRange(bytes, j)), this.cacheFunction.cacheKey(this.keySchema.upperRange(bytes2, j2))), this.keySchema.hasNextCondition(bytes, bytes2, j, j2), this.cacheFunction), fetch, this.bytesSerdes, this.windowSize, this.cacheFunction);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        validateStoreOpen();
        return new MergedSortedCacheWindowStoreKeyValueIterator(this.cache.all(this.name), wrapped().all(), this.bytesSerdes, this.windowSize, this.cacheFunction);
    }

    @Override // org.apache.kafka.streams.state.WindowStore, org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long j, long j2) {
        validateStoreOpen();
        return new MergedSortedCacheWindowStoreKeyValueIterator(new FilteredCacheIterator(this.cache.all(this.name), this.keySchema.hasNextCondition(null, null, j, j2), this.cacheFunction), wrapped().fetchAll(j, j2), this.bytesSerdes, this.windowSize, this.cacheFunction);
    }
}
