package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingKeyValueStore.class */
public class CachingKeyValueStore extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]> implements KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CachingKeyValueStore.class);
    private CacheFlushListener<byte[], byte[]> flushListener;
    private boolean sendOldValues;
    private String cacheName;
    private InternalProcessorContext<?, ?> context;
    private Thread streamThread;
    private final ReadWriteLock lock;
    private final Position position;
    private final boolean timestampedSchema;
    private final Map<Class, CacheQueryHandler> queryHandlers;

    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingKeyValueStore$CacheQueryHandler.class */
    public interface CacheQueryHandler {
        QueryResult<?> apply(Query<?> query, Position position, PositionBound positionBound, QueryConfig queryConfig, StateStore stateStore);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingKeyValueStore(KeyValueStore<Bytes, byte[]> keyValueStore, boolean z) {
        super(keyValueStore);
        this.lock = new ReentrantReadWriteLock();
        this.queryHandlers = Utils.mkMap(Utils.mkEntry(KeyQuery.class, (query, position, positionBound, queryConfig, stateStore) -> {
            return runKeyQuery(query, position, positionBound, queryConfig);
        }));
        this.position = Position.emptyPosition();
        this.timestampedSchema = z;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        initInternal(ProcessorContextUtils.asInternalProcessorContext(processorContext));
        super.init(processorContext, stateStore);
        this.streamThread = Thread.currentThread();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        initInternal(ProcessorContextUtils.asInternalProcessorContext(stateStoreContext));
        super.init(stateStoreContext, stateStore);
        this.streamThread = Thread.currentThread();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public Position getPosition() {
        Position emptyPosition = Position.emptyPosition();
        emptyPosition.merge(this.position);
        emptyPosition.merge(wrapped().getPosition());
        return emptyPosition;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, QueryConfig queryConfig) {
        QueryResult<?> notUpToBound;
        long nanoTime = queryConfig.isCollectExecutionInfo() ? System.nanoTime() : -1L;
        CacheQueryHandler cacheQueryHandler = this.queryHandlers.get(query.getClass());
        if (cacheQueryHandler == null) {
            notUpToBound = wrapped().query(query, positionBound, queryConfig);
        } else {
            int partition = this.context.taskId().partition();
            Lock readLock = this.lock.readLock();
            readLock.lock();
            try {
                validateStoreOpen();
                Position position = getPosition();
                notUpToBound = !StoreQueryUtils.isPermitted(position, positionBound, partition) ? QueryResult.notUpToBound(position, positionBound, Integer.valueOf(partition)) : cacheQueryHandler.apply(query, position, positionBound, queryConfig, this);
            } finally {
                readLock.unlock();
            }
        }
        if (queryConfig.isCollectExecutionInfo()) {
            notUpToBound.addExecutionInfo("Handled in " + getClass() + " in " + (System.nanoTime() - nanoTime) + "ns");
        }
        return (QueryResult<R>) notUpToBound;
    }

    private <R> QueryResult<R> runKeyQuery(Query<R> query, Position position, PositionBound positionBound, QueryConfig queryConfig) {
        LRUCacheEntry lRUCacheEntry;
        QueryResult<R> queryResult = null;
        KeyQuery keyQuery = (KeyQuery) query;
        if (keyQuery.isSkipCache()) {
            return wrapped().query(query, positionBound, queryConfig);
        }
        Bytes bytes = (Bytes) keyQuery.getKey();
        if (this.context.cache() != null && (lRUCacheEntry = this.context.cache().get(this.cacheName, bytes)) != null) {
            queryResult = QueryResult.forResult((!this.timestampedSchema || WrappedStateStore.isTimestamped(wrapped())) ? lRUCacheEntry.value() : ValueAndTimestampDeserializer.rawValue(lRUCacheEntry.value()));
        }
        if (queryResult == null) {
            queryResult = wrapped().query(query, PositionBound.unbounded(), queryConfig);
        }
        queryResult.setPosition(position);
        return queryResult;
    }

    private void initInternal(InternalProcessorContext<?, ?> internalProcessorContext) {
        this.context = internalProcessorContext;
        this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(internalProcessorContext.taskId().toString(), name());
        this.context.registerCacheFlushListener(this.cacheName, list -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                putAndMaybeForward((ThreadCache.DirtyEntry) it.next(), internalProcessorContext);
            }
        });
    }

    private void putAndMaybeForward(ThreadCache.DirtyEntry dirtyEntry, InternalProcessorContext<?, ?> internalProcessorContext) {
        ProcessorRecordContext recordContext;
        if (this.flushListener == null) {
            recordContext = internalProcessorContext.recordContext();
            try {
                internalProcessorContext.setRecordContext(dirtyEntry.entry().context());
                wrapped().put(dirtyEntry.key(), dirtyEntry.newValue());
                internalProcessorContext.setRecordContext(recordContext);
                return;
            } finally {
            }
        }
        byte[] newValue = dirtyEntry.newValue();
        byte[] bArr = (newValue == null || this.sendOldValues) ? wrapped().get(dirtyEntry.key()) : null;
        if (newValue == null && bArr == null) {
            return;
        }
        recordContext = internalProcessorContext.recordContext();
        try {
            internalProcessorContext.setRecordContext(dirtyEntry.entry().context());
            wrapped().put(dirtyEntry.key(), dirtyEntry.newValue());
            this.flushListener.apply(new Record<>(dirtyEntry.key().get(), new Change(newValue, this.sendOldValues ? bArr : null), dirtyEntry.entry().context().timestamp(), dirtyEntry.entry().context().headers()));
            internalProcessorContext.setRecordContext(recordContext);
        } finally {
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.state.internals.CachedStateStore
    public boolean setFlushListener(CacheFlushListener<byte[], byte[]> cacheFlushListener, boolean z) {
        this.flushListener = cacheFlushListener;
        this.sendOldValues = z;
        return true;
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public void put(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            validateStoreOpen();
            putInternal(bytes, bArr);
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private void putInternal(Bytes bytes, byte[] bArr) {
        this.context.cache().put(this.cacheName, bytes, new LRUCacheEntry(bArr, this.context.headers(), true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic()));
        StoreQueryUtils.updatePosition(this.position, this.context);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            validateStoreOpen();
            byte[] internal = getInternal(bytes);
            if (internal == null) {
                putInternal(bytes, bArr);
            }
            return internal;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            validateStoreOpen();
            for (KeyValue<Bytes, byte[]> keyValue : list) {
                Objects.requireNonNull(keyValue.key, "key cannot be null");
                put(keyValue.key, keyValue.value);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public byte[] delete(Bytes bytes) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            validateStoreOpen();
            return deleteInternal(bytes);
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private byte[] deleteInternal(Bytes bytes) {
        byte[] internal = getInternal(bytes);
        putInternal(bytes, null);
        return internal;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public byte[] get(Bytes bytes) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        Lock writeLock = Thread.currentThread().equals(this.streamThread) ? this.lock.writeLock() : this.lock.readLock();
        writeLock.lock();
        try {
            validateStoreOpen();
            byte[] internal = getInternal(bytes);
            writeLock.unlock();
            return internal;
        } catch (Throwable th) {
            writeLock.unlock();
            throw th;
        }
    }

    private byte[] getInternal(Bytes bytes) {
        LRUCacheEntry lRUCacheEntry = null;
        if (this.context.cache() != null) {
            lRUCacheEntry = this.context.cache().get(this.cacheName, bytes);
        }
        if (lRUCacheEntry != null) {
            return lRUCacheEntry.value();
        }
        byte[] bArr = wrapped().get(bytes);
        if (bArr == null) {
            return null;
        }
        if (Thread.currentThread().equals(this.streamThread)) {
            this.context.cache().put(this.cacheName, bytes, new LRUCacheEntry(bArr));
        }
        return bArr;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        if (Objects.nonNull(bytes) && Objects.nonNull(bytes2) && bytes.compareTo(bytes2) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        validateStoreOpen();
        return new MergedSortedCacheKeyValueBytesStoreIterator(this.context.cache().range(this.cacheName, bytes, bytes2), wrapped().range(bytes, bytes2), true);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public KeyValueIterator<Bytes, byte[]> reverseRange(Bytes bytes, Bytes bytes2) {
        if (Objects.nonNull(bytes) && Objects.nonNull(bytes2) && bytes.compareTo(bytes2) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        validateStoreOpen();
        return new MergedSortedCacheKeyValueBytesStoreIterator(this.context.cache().reverseRange(this.cacheName, bytes, bytes2), wrapped().reverseRange(bytes, bytes2), false);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public KeyValueIterator<Bytes, byte[]> all() {
        validateStoreOpen();
        return new MergedSortedCacheKeyValueBytesStoreIterator(this.context.cache().all(this.cacheName), new DelegatingPeekingKeyValueIterator(name(), wrapped().all()), true);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P p, PS ps) {
        validateStoreOpen();
        KeyValueIterator<Bytes, byte[]> prefixScan = wrapped().prefixScan(p, ps);
        Bytes wrap = Bytes.wrap(ps.serialize(null, p));
        return new MergedSortedCacheKeyValueBytesStoreIterator(this.context.cache().range(this.cacheName, wrap, Bytes.increment(wrap), false), prefixScan, true);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public KeyValueIterator<Bytes, byte[]> reverseAll() {
        validateStoreOpen();
        return new MergedSortedCacheKeyValueBytesStoreIterator(this.context.cache().reverseAll(this.cacheName), new DelegatingPeekingKeyValueIterator(name(), wrapped().reverseAll()), false);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public long approximateNumEntries() {
        validateStoreOpen();
        this.lock.readLock().lock();
        try {
            validateStoreOpen();
            return wrapped().approximateNumEntries();
        } finally {
            this.lock.readLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void flush() {
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            validateStoreOpen();
            this.context.cache().flush(this.cacheName);
            wrapped().flush();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.state.internals.CachedStateStore
    public void flushCache() {
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            validateStoreOpen();
            this.context.cache().flush(this.cacheName);
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.state.internals.CachedStateStore
    public void clearCache() {
        validateStoreOpen();
        this.lock.writeLock().lock();
        try {
            validateStoreOpen();
            this.context.cache().clear(this.cacheName);
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        this.lock.writeLock().lock();
        try {
            KeyValueStore<Bytes, byte[]> wrapped = wrapped();
            wrapped.getClass();
            LinkedList<RuntimeException> executeAll = ExceptionUtils.executeAll(() -> {
                this.context.cache().flush(this.cacheName);
            }, () -> {
                this.context.cache().close(this.cacheName);
            }, wrapped::close);
            if (!executeAll.isEmpty()) {
                ExceptionUtils.throwSuppressed("Caught an exception while closing caching key value store for store " + name(), executeAll);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }
}
