package org.apache.kafka.streams.state.internals;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.Serdes;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/StoreChangeLogger.class */
public class StoreChangeLogger<K, V> {
    protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
    protected final Serdes<K, V> serialization;
    private final String topic;
    private final int partition;
    private final ProcessorContext context;
    private final int maxDirty;
    private final int maxRemoved;
    protected Set<K> dirty;
    protected Set<K> removed;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/StoreChangeLogger$ValueGetter.class */
    public interface ValueGetter<K, V> {
        V get(K k);
    }

    public StoreChangeLogger(String str, ProcessorContext processorContext, Serdes<K, V> serdes) {
        this(str, processorContext, serdes, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
    }

    public StoreChangeLogger(String str, ProcessorContext processorContext, Serdes<K, V> serdes, int i, int i2) {
        this(str, processorContext, processorContext.taskId().partition, serdes, i, i2);
        init();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StoreChangeLogger(String str, ProcessorContext processorContext, int i, Serdes<K, V> serdes, int i2, int i3) {
        this.topic = ProcessorStateManager.storeChangelogTopic(processorContext.jobId(), str);
        this.context = processorContext;
        this.partition = i;
        this.serialization = serdes;
        this.maxDirty = i2;
        this.maxRemoved = i3;
    }

    public void init() {
        this.dirty = new HashSet();
        this.removed = new HashSet();
    }

    public void add(K k) {
        this.dirty.add(k);
        this.removed.remove(k);
    }

    public void delete(K k) {
        this.dirty.remove(k);
        this.removed.add(k);
    }

    public void maybeLogChange(ValueGetter<K, V> valueGetter) {
        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved) {
            logChange(valueGetter);
        }
    }

    public void logChange(ValueGetter<K, V> valueGetter) {
        RecordCollector recordCollector;
        if ((this.removed.isEmpty() && this.dirty.isEmpty()) || (recordCollector = ((RecordCollector.Supplier) this.context).recordCollector()) == null) {
            return;
        }
        Serializer<K> keySerializer = this.serialization.keySerializer();
        Serializer<V> valueSerializer = this.serialization.valueSerializer();
        Iterator<K> it = this.removed.iterator();
        while (it.hasNext()) {
            recordCollector.send(new ProducerRecord<>(this.topic, Integer.valueOf(this.partition), it.next(), (Object) null), keySerializer, valueSerializer);
        }
        for (K k : this.dirty) {
            recordCollector.send(new ProducerRecord<>(this.topic, Integer.valueOf(this.partition), k, valueGetter.get(k)), keySerializer, valueSerializer);
        }
        this.removed.clear();
        this.dirty.clear();
    }

    public void clear() {
        this.removed.clear();
        this.dirty.clear();
    }

    public int numDirty() {
        return this.dirty.size();
    }

    public int numRemoved() {
        return this.removed.size();
    }
}
