package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingSessionStore.class */
public class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, AGG> {
    private final SessionStore<Bytes, byte[]> bytesStore;
    private final SessionKeySchema keySchema;
    private final Serde<K> keySerde;
    private final Serde<AGG> aggSerde;
    private final SegmentedCacheFunction cacheFunction;
    private String cacheName;
    private ThreadCache cache;
    private StateSerdes<K, AGG> serdes;
    private InternalProcessorContext context;
    private CacheFlushListener<Windowed<K>, AGG> flushListener;
    private boolean sendOldValues;
    private String topic;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingSessionStore(SessionStore<Bytes, byte[]> sessionStore, Serde<K> serde, Serde<AGG> serde2, long j) {
        super(sessionStore);
        this.bytesStore = sessionStore;
        this.keySerde = serde;
        this.aggSerde = serde2;
        this.keySchema = new SessionKeySchema();
        this.cacheFunction = new SegmentedCacheFunction(this.keySchema, j);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.topic = ProcessorStateManager.storeChangelogTopic(processorContext.applicationId(), stateStore.name());
        initInternal((InternalProcessorContext) processorContext);
        this.bytesStore.init(processorContext, stateStore);
    }

    private void initInternal(final InternalProcessorContext internalProcessorContext) {
        this.context = internalProcessorContext;
        this.serdes = new StateSerdes<>(this.topic, this.keySerde == null ? internalProcessorContext.keySerde() : this.keySerde, this.aggSerde == null ? internalProcessorContext.valueSerde() : this.aggSerde);
        this.cacheName = internalProcessorContext.taskId() + "-" + this.bytesStore.name();
        this.cache = internalProcessorContext.getCache();
        this.cache.addDirtyEntryFlushListener(this.cacheName, new ThreadCache.DirtyEntryFlushListener() { // from class: org.apache.kafka.streams.state.internals.CachingSessionStore.1
            @Override // org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener
            public void apply(List<ThreadCache.DirtyEntry> list) {
                Iterator<ThreadCache.DirtyEntry> it = list.iterator();
                while (it.hasNext()) {
                    CachingSessionStore.this.putAndMaybeForward(it.next(), internalProcessorContext);
                }
            }
        });
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes bytes, long j, long j2) {
        validateStoreOpen();
        ThreadCache.MemoryLRUCacheBytesIterator range = this.cache.range(this.cacheName, this.cacheFunction.cacheKey(this.keySchema.lowerRangeFixedSize(bytes, j)), this.cacheFunction.cacheKey(this.keySchema.upperRangeFixedSize(bytes, j2)));
        return new MergedSortedCacheSessionStoreIterator(new FilteredCacheIterator(range, this.keySchema.hasNextCondition(bytes, bytes, j, j2), this.cacheFunction), this.bytesStore.findSessions(bytes, j, j2), this.cacheFunction);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes bytes, Bytes bytes2, long j, long j2) {
        validateStoreOpen();
        ThreadCache.MemoryLRUCacheBytesIterator range = this.cache.range(this.cacheName, this.cacheFunction.cacheKey(this.keySchema.lowerRange(bytes, j)), this.cacheFunction.cacheKey(this.keySchema.upperRange(bytes2, j2)));
        return new MergedSortedCacheSessionStoreIterator(new FilteredCacheIterator(range, this.keySchema.hasNextCondition(bytes, bytes2, j, j2), this.cacheFunction), this.bytesStore.findSessions(bytes, bytes2, j, j2), this.cacheFunction);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void remove(Windowed<Bytes> windowed) {
        validateStoreOpen();
        put(windowed, (byte[]) null);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void put(Windowed<Bytes> windowed, byte[] bArr) {
        validateStoreOpen();
        Bytes wrap = Bytes.wrap(SessionKeySchema.toBinary(windowed));
        this.cache.put(this.cacheName, this.cacheFunction.cacheKey(wrap), new LRUCacheEntry(bArr, this.context.headers(), true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic()));
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes) {
        Objects.requireNonNull(bytes, "key cannot be null");
        return findSessions(bytes, 0L, Long.MAX_VALUE);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2) {
        Objects.requireNonNull(bytes, "from cannot be null");
        Objects.requireNonNull(bytes2, "to cannot be null");
        return findSessions(bytes, bytes2, 0L, Long.MAX_VALUE);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putAndMaybeForward(ThreadCache.DirtyEntry dirtyEntry, InternalProcessorContext internalProcessorContext) {
        Bytes key = this.cacheFunction.key(dirtyEntry.key());
        ProcessorRecordContext recordContext = internalProcessorContext.recordContext();
        internalProcessorContext.setRecordContext(dirtyEntry.entry().context());
        try {
            Windowed<K> from = SessionKeySchema.from(key.get(), this.serdes.keyDeserializer(), this.topic);
            Bytes wrap = Bytes.wrap(this.serdes.rawKey(from.key()));
            if (this.flushListener != null) {
                AGG valueFrom = this.serdes.valueFrom(dirtyEntry.newValue());
                AGG fetchPrevious = (valueFrom == null || this.sendOldValues) ? fetchPrevious(wrap, from.window()) : null;
                if (valueFrom != null || fetchPrevious != null) {
                    this.flushListener.apply(from, valueFrom, fetchPrevious);
                }
            }
            this.bytesStore.put(new Windowed<>(wrap, from.window()), dirtyEntry.newValue());
            internalProcessorContext.setRecordContext(recordContext);
        } catch (Throwable th) {
            internalProcessorContext.setRecordContext(recordContext);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private AGG fetchPrevious(Bytes bytes, Window window) {
        KeyValueIterator<Windowed<Bytes>, byte[]> findSessions = this.bytesStore.findSessions(bytes, window.start(), window.end());
        Throwable th = null;
        try {
            if (!findSessions.hasNext()) {
                return null;
            }
            AGG valueFrom = this.serdes.valueFrom((byte[]) findSessions.next().value);
            if (findSessions != null) {
                if (0 != 0) {
                    try {
                        findSessions.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    findSessions.close();
                }
            }
            return valueFrom;
        } finally {
            if (findSessions != null) {
                if (0 != 0) {
                    try {
                        findSessions.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    findSessions.close();
                }
            }
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.processor.StateStore
    public void flush() {
        this.cache.flush(this.cacheName);
        this.bytesStore.flush();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        flush();
        this.cache.close(this.cacheName);
        this.bytesStore.close();
    }

    @Override // org.apache.kafka.streams.state.internals.CachedStateStore
    public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> cacheFlushListener, boolean z) {
        this.flushListener = cacheFlushListener;
        this.sendOldValues = z;
    }
}
