package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.List;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
import org.apache.kafka.streams.state.internals.StoreChangeLogger;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.class */
public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
    private final KeyValueStore<K, V> inner;
    private final Serdes<K, V> serdes;
    private final String storeName;
    private StoreChangeLogger<K, V> changeLogger;
    private StoreChangeLogger.ValueGetter<K, V> getter;

    public InMemoryKeyValueLoggedStore(String str, KeyValueStore<K, V> keyValueStore, Serdes<K, V> serdes) {
        this.storeName = str;
        this.inner = keyValueStore;
        this.serdes = serdes;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.storeName;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.changeLogger = new StoreChangeLogger<>(this.storeName, processorContext, this.serdes);
        this.inner.init(processorContext, stateStore);
        this.getter = new StoreChangeLogger.ValueGetter<K, V>() { // from class: org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.1
            @Override // org.apache.kafka.streams.state.internals.StoreChangeLogger.ValueGetter
            public V get(K k) {
                return (V) InMemoryKeyValueLoggedStore.this.inner.get(k);
            }
        };
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return this.inner.persistent();
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public V get(K k) {
        return this.inner.get(k);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public void put(K k, V v) {
        this.inner.put(k, v);
        this.changeLogger.add(k);
        this.changeLogger.maybeLogChange(this.getter);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public void putAll(List<KeyValue<K, V>> list) {
        this.inner.putAll(list);
        Iterator<KeyValue<K, V>> it = list.iterator();
        while (it.hasNext()) {
            this.changeLogger.add(it.next().key);
        }
        this.changeLogger.maybeLogChange(this.getter);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public V delete(K k) {
        V delete = this.inner.delete(k);
        removed(k);
        return delete;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removed(K k) {
        this.changeLogger.delete(k);
        this.changeLogger.maybeLogChange(this.getter);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public KeyValueIterator<K, V> range(K k, K k2) {
        return this.inner.range(k, k2);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public KeyValueIterator<K, V> all() {
        return this.inner.all();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void close() {
        this.inner.close();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void flush() {
        this.inner.flush();
        this.changeLogger.logChange(this.getter);
    }
}
