package org.apache.kafka.streams.state.internals;

import java.util.List;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingWindowStore.class */
public class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateStore implements WindowStore<K, V>, CachedStateStore<Windowed<K>, V> {
    private final WindowStore<Bytes, byte[]> underlying;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private CacheFlushListener<Windowed<K>, V> flushListener;
    private final long windowSize;
    private String name;
    private ThreadCache cache;
    private InternalProcessorContext context;
    private StateSerdes<K, V> serdes;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingWindowStore(WindowStore<Bytes, byte[]> windowStore, Serde<K> serde, Serde<V> serde2, long j) {
        super(windowStore);
        this.underlying = windowStore;
        this.keySerde = serde;
        this.valueSerde = serde2;
        this.windowSize = j;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractWrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.underlying.init(processorContext, stateStore);
        initInternal(processorContext);
    }

    void initInternal(final ProcessorContext processorContext) {
        this.context = (InternalProcessorContext) processorContext;
        this.serdes = new StateSerdes<>(this.underlying.name(), this.keySerde == null ? processorContext.keySerde() : this.keySerde, this.valueSerde == null ? processorContext.valueSerde() : this.valueSerde);
        this.name = processorContext.taskId() + "-" + this.underlying.name();
        this.cache = this.context.getCache();
        this.cache.addDirtyEntryFlushListener(this.name, new ThreadCache.DirtyEntryFlushListener() { // from class: org.apache.kafka.streams.state.internals.CachingWindowStore.1
            @Override // org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener
            public void apply(List<ThreadCache.DirtyEntry> list) {
                for (ThreadCache.DirtyEntry dirtyEntry : list) {
                    byte[] bArr = dirtyEntry.key().get();
                    long timestampFromBinaryKey = WindowStoreUtils.timestampFromBinaryKey(bArr);
                    Windowed windowed = new Windowed(WindowStoreUtils.keyFromBinaryKey(bArr, CachingWindowStore.this.serdes), new TimeWindow(timestampFromBinaryKey, timestampFromBinaryKey + CachingWindowStore.this.windowSize));
                    Bytes bytesKeyFromBinaryKey = WindowStoreUtils.bytesKeyFromBinaryKey(bArr);
                    CachingWindowStore.this.maybeForward(dirtyEntry, bytesKeyFromBinaryKey, windowed, (InternalProcessorContext) processorContext);
                    CachingWindowStore.this.underlying.put(bytesKeyFromBinaryKey, dirtyEntry.newValue(), timestampFromBinaryKey);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void maybeForward(ThreadCache.DirtyEntry dirtyEntry, Bytes bytes, Windowed<K> windowed, InternalProcessorContext internalProcessorContext) {
        if (this.flushListener != null) {
            RecordContext recordContext = internalProcessorContext.recordContext();
            internalProcessorContext.setRecordContext(dirtyEntry.recordContext());
            try {
                this.flushListener.apply(windowed, this.serdes.valueFrom(dirtyEntry.newValue()), fetchPrevious(bytes, windowed.window().start()));
                internalProcessorContext.setRecordContext(recordContext);
            } catch (Throwable th) {
                internalProcessorContext.setRecordContext(recordContext);
                throw th;
            }
        }
    }

    @Override // org.apache.kafka.streams.state.internals.CachedStateStore
    public void setFlushListener(CacheFlushListener<Windowed<K>, V> cacheFlushListener) {
        this.flushListener = cacheFlushListener;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractWrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public synchronized void flush() {
        this.cache.flush(this.name);
        this.underlying.flush();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractWrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        flush();
        this.underlying.close();
        this.cache.close(this.name);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public synchronized void put(K k, V v) {
        put(k, v, this.context.timestamp());
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public synchronized void put(K k, V v, long j) {
        validateStoreOpen();
        this.cache.put(this.name, WindowStoreUtils.toBinaryKey(k, j, 0, this.serdes), new LRUCacheEntry(this.serdes.rawValue(v), true, this.context.offset(), j, this.context.partition(), this.context.topic()));
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public synchronized WindowStoreIterator<V> fetch(K k, long j, long j2) {
        validateStoreOpen();
        byte[] binaryKey = WindowStoreUtils.toBinaryKey(k, j, 0, this.serdes);
        byte[] binaryKey2 = WindowStoreUtils.toBinaryKey(k, j2, 0, this.serdes);
        return new MergedSortedCacheWindowStoreIterator(this.cache.range(this.name, binaryKey, binaryKey2), this.underlying.fetch(Bytes.wrap(this.serdes.rawKey(k)), j, j2), new StateSerdes(this.serdes.stateName(), Serdes.Long(), this.serdes.valueSerde()));
    }

    private V fetchPrevious(Bytes bytes, long j) {
        WindowStoreIterator<byte[]> fetch = this.underlying.fetch(bytes, j, j);
        Throwable th = null;
        try {
            try {
                if (!fetch.hasNext()) {
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    return null;
                }
                V valueFrom = this.serdes.valueFrom((byte[]) ((KeyValue) fetch.next()).value);
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        fetch.close();
                    }
                }
                return valueFrom;
            } finally {
            }
        } catch (Throwable th4) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th4;
        }
    }
}
