/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StateSerdes;

public class StoreChangeLogger<K, V> {
    protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
    protected final StateSerdes<K, V> serialization;
    private final String topic;
    private final int partition;
    private final ProcessorContext context;
    private final int maxDirty;
    private final int maxRemoved;
    protected Set<K> dirty;
    protected Set<K> removed;

    public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
        this(storeName, context, serialization, 100, 100);
    }

    public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
        this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved);
        this.init();
    }

    protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
        this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
        this.context = context;
        this.partition = partition;
        this.serialization = serialization;
        this.maxDirty = maxDirty;
        this.maxRemoved = maxRemoved;
    }

    public void init() {
        this.dirty = new HashSet<K>();
        this.removed = new HashSet<K>();
    }

    public void add(K key) {
        this.dirty.add(key);
        this.removed.remove(key);
    }

    public void delete(K key) {
        this.dirty.remove(key);
        this.removed.add(key);
    }

    public void maybeLogChange(ValueGetter<K, V> getter) {
        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved) {
            this.logChange(getter);
        }
    }

    public void logChange(ValueGetter<K, V> getter) {
        if (this.removed.isEmpty() && this.dirty.isEmpty()) {
            return;
        }
        RecordCollector collector = ((RecordCollector.Supplier)((Object)this.context)).recordCollector();
        if (collector != null) {
            Serializer<K> keySerializer = this.serialization.keySerializer();
            Serializer<V> valueSerializer = this.serialization.valueSerializer();
            for (K k : this.removed) {
                collector.send(new ProducerRecord(this.topic, Integer.valueOf(this.partition), k, (Object)null), keySerializer, valueSerializer);
            }
            for (K k : this.dirty) {
                V v = getter.get(k);
                collector.send(new ProducerRecord(this.topic, Integer.valueOf(this.partition), Long.valueOf(this.context.timestamp()), k, v), keySerializer, valueSerializer);
            }
            this.removed.clear();
            this.dirty.clear();
        }
    }

    public void clear() {
        this.removed.clear();
        this.dirty.clear();
    }

    public int numDirty() {
        return this.dirty.size();
    }

    public int numRemoved() {
        return this.removed.size();
    }

    public static interface ValueGetter<K, V> {
        public V get(K var1);
    }
}

