/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.MergedSortedCacheKeyValueStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;

class CachingKeyValueStore<K, V>
implements KeyValueStore<K, V>,
CachedStateStore<K, V> {
    private final KeyValueStore<Bytes, byte[]> underlying;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private CacheFlushListener<K, V> flushListener;
    private String name;
    private ThreadCache cache;
    private InternalProcessorContext context;
    private StateSerdes<K, V> serdes;
    private Thread streamThread;

    CachingKeyValueStore(KeyValueStore<Bytes, byte[]> underlying, Serde<K> keySerde, Serde<V> valueSerde) {
        this.underlying = underlying;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    @Override
    public String name() {
        return this.underlying.name();
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.underlying.init(context, root);
        this.initInternal(context);
        this.streamThread = Thread.currentThread();
    }

    void initInternal(final ProcessorContext context) {
        this.context = (InternalProcessorContext)context;
        this.serdes = new StateSerdes(this.underlying.name(), (Serde<?>)(this.keySerde == null ? context.keySerde() : this.keySerde), (Serde<?>)(this.valueSerde == null ? context.valueSerde() : this.valueSerde));
        this.name = context.taskId() + "-" + this.underlying.name();
        this.cache = this.context.getCache();
        this.cache.addDirtyEntryFlushListener(this.name, new ThreadCache.DirtyEntryFlushListener(){

            @Override
            public void apply(List<ThreadCache.DirtyEntry> entries) {
                ArrayList keyValues = new ArrayList();
                for (ThreadCache.DirtyEntry entry : entries) {
                    keyValues.add(KeyValue.pair(entry.key(), entry.newValue()));
                    CachingKeyValueStore.this.maybeForward(entry, (InternalProcessorContext)context);
                }
                CachingKeyValueStore.this.underlying.putAll(keyValues);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void maybeForward(ThreadCache.DirtyEntry entry, InternalProcessorContext context) {
        if (this.flushListener != null) {
            RecordContext current = context.recordContext();
            context.setRecordContext(entry.recordContext());
            try {
                this.flushListener.apply(this.serdes.keyFrom(entry.key().get()), this.serdes.valueFrom(entry.newValue()), this.serdes.valueFrom((byte[])this.underlying.get(entry.key())));
            }
            finally {
                context.setRecordContext(current);
            }
        }
    }

    @Override
    public void setFlushListener(CacheFlushListener<K, V> flushListener) {
        this.flushListener = flushListener;
    }

    @Override
    public synchronized void flush() {
        this.cache.flush(this.name);
        this.underlying.flush();
    }

    @Override
    public void close() {
        this.flush();
        this.underlying.close();
    }

    @Override
    public boolean persistent() {
        return this.underlying.persistent();
    }

    @Override
    public boolean isOpen() {
        return this.underlying.isOpen();
    }

    @Override
    public synchronized V get(K key) {
        byte[] rawKey = this.serdes.rawKey(key);
        return this.get(rawKey);
    }

    @Override
    private V get(byte[] rawKey) {
        LRUCacheEntry entry = this.cache.get(this.name, rawKey);
        if (entry == null) {
            byte[] rawValue = (byte[])this.underlying.get(Bytes.wrap((byte[])rawKey));
            if (rawValue == null) {
                return null;
            }
            if (Thread.currentThread().equals(this.streamThread)) {
                this.cache.put(this.name, rawKey, new LRUCacheEntry(rawValue));
            }
            return this.serdes.valueFrom(rawValue);
        }
        if (entry.value == null) {
            return null;
        }
        return this.serdes.valueFrom(entry.value);
    }

    @Override
    public KeyValueIterator<K, V> range(K from, K to) {
        byte[] origFrom = this.serdes.rawKey(from);
        byte[] origTo = this.serdes.rawKey(to);
        DelegatingPeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<Bytes, byte[]>(this.underlying.range(Bytes.wrap((byte[])origFrom), Bytes.wrap((byte[])origTo)));
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.range(this.name, origFrom, origTo);
        return new MergedSortedCacheKeyValueStoreIterator<K, V>(cacheIterator, storeIterator, this.serdes);
    }

    @Override
    public KeyValueIterator<K, V> all() {
        DelegatingPeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<Bytes, byte[]>(this.underlying.all());
        ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = this.cache.all(this.name);
        return new MergedSortedCacheKeyValueStoreIterator<K, V>(cacheIterator, storeIterator, this.serdes);
    }

    @Override
    public synchronized long approximateNumEntries() {
        return this.underlying.approximateNumEntries();
    }

    @Override
    public synchronized void put(K key, V value) {
        this.put(this.serdes.rawKey(key), value);
    }

    @Override
    private synchronized void put(byte[] rawKey, V value) {
        byte[] rawValue = this.serdes.rawValue(value);
        this.cache.put(this.name, rawKey, new LRUCacheEntry(rawValue, true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic()));
    }

    @Override
    public synchronized V putIfAbsent(K key, V value) {
        byte[] rawKey = this.serdes.rawKey(key);
        V v = this.get(rawKey);
        if (v == null) {
            this.put(rawKey, value);
        }
        return v;
    }

    @Override
    public synchronized void putAll(List<KeyValue<K, V>> entries) {
        for (KeyValue<K, V> entry : entries) {
            this.put(entry.key, entry.value);
        }
    }

    @Override
    public synchronized V delete(K key) {
        byte[] rawKey = this.serdes.rawKey(key);
        V v = this.get(rawKey);
        this.put(rawKey, (V)null);
        return v;
    }

    KeyValueStore<Bytes, byte[]> underlying() {
        return this.underlying;
    }
}

