package org.apache.kafka.streams.state.internals;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.class */
public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
    private final String name;
    private final AbstractSegments<S> segments;
    private final String metricScope;
    private final SegmentedBytesStore.KeySchema keySchema;
    private InternalProcessorContext context;
    private volatile boolean open;
    private Set<S> bulkLoadSegments;
    private Sensor expiredRecordSensor;
    private long observedStreamTime = -1;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore$RocksDBSegmentsBatchingRestoreCallback.class */
    private class RocksDBSegmentsBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback {
        private RocksDBSegmentsBatchingRestoreCallback() {
        }

        @Override // org.apache.kafka.streams.processor.BatchingStateRestoreCallback
        public void restoreAll(Collection<KeyValue<byte[], byte[]>> collection) {
            AbstractRocksDBSegmentedBytesStore.this.restoreAllInternal(collection);
        }

        @Override // org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback, org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            AbstractRocksDBSegmentedBytesStore.this.toggleForBulkLoading(true);
        }

        @Override // org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback, org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
            AbstractRocksDBSegmentedBytesStore.this.toggleForBulkLoading(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractRocksDBSegmentedBytesStore(String str, String str2, SegmentedBytesStore.KeySchema keySchema, AbstractSegments<S> abstractSegments) {
        this.name = str;
        this.metricScope = str2;
        this.keySchema = keySchema;
        this.segments = abstractSegments;
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes bytes, long j, long j2) {
        List<S> segmentsToSearch = this.keySchema.segmentsToSearch(this.segments, j, j2);
        return new SegmentIterator(segmentsToSearch.iterator(), this.keySchema.hasNextCondition(bytes, bytes, j, j2), this.keySchema.lowerRangeFixedSize(bytes, j), this.keySchema.upperRangeFixedSize(bytes, j2));
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        if (bytes.compareTo(bytes2) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        List<S> segmentsToSearch = this.keySchema.segmentsToSearch(this.segments, j, j2);
        return new SegmentIterator(segmentsToSearch.iterator(), this.keySchema.hasNextCondition(bytes, bytes2, j, j2), this.keySchema.lowerRange(bytes, j), this.keySchema.upperRange(bytes2, j2));
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public KeyValueIterator<Bytes, byte[]> all() {
        return new SegmentIterator(this.segments.allSegments().iterator(), this.keySchema.hasNextCondition(null, null, 0L, Long.MAX_VALUE), null, null);
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public KeyValueIterator<Bytes, byte[]> fetchAll(long j, long j2) {
        return new SegmentIterator(this.segments.segments(j, j2).iterator(), this.keySchema.hasNextCondition(null, null, j, j2), null, null);
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public void remove(Bytes bytes) {
        long segmentTimestamp = this.keySchema.segmentTimestamp(bytes);
        this.observedStreamTime = Math.max(this.observedStreamTime, segmentTimestamp);
        S segmentForTimestamp = this.segments.getSegmentForTimestamp(segmentTimestamp);
        if (segmentForTimestamp == null) {
            return;
        }
        segmentForTimestamp.delete(bytes);
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public void put(Bytes bytes, byte[] bArr) {
        long segmentTimestamp = this.keySchema.segmentTimestamp(bytes);
        this.observedStreamTime = Math.max(this.observedStreamTime, segmentTimestamp);
        S orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(this.segments.segmentId(segmentTimestamp), this.context, this.observedStreamTime);
        if (orCreateSegmentIfLive != null) {
            orCreateSegmentIfLive.put(bytes, bArr);
        } else {
            this.expiredRecordSensor.record();
            LOG.warn("Skipping record for expired segment.");
        }
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public byte[] get(Bytes bytes) {
        S segmentForTimestamp = this.segments.getSegmentForTimestamp(this.keySchema.segmentTimestamp(bytes));
        if (segmentForTimestamp == null) {
            return null;
        }
        return (byte[]) segmentForTimestamp.get(bytes);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.context = (InternalProcessorContext) processorContext;
        this.expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), this.metricScope, name(), this.context.metrics());
        this.segments.openExisting(this.context, this.observedStreamTime);
        this.bulkLoadSegments = new HashSet(this.segments.allSegments());
        processorContext.register(stateStore, new RocksDBSegmentsBatchingRestoreCallback());
        this.open = true;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void flush() {
        this.segments.flush();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void close() {
        this.open = false;
        this.segments.close();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return true;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.open;
    }

    List<S> getSegments() {
        return this.segments.allSegments();
    }

    void restoreAllInternal(Collection<KeyValue<byte[], byte[]>> collection) {
        try {
            for (Map.Entry<S, WriteBatch> entry : getWriteBatches(collection).entrySet()) {
                S key = entry.getKey();
                WriteBatch value = entry.getValue();
                key.write(value);
                value.close();
            }
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
        }
    }

    Map<S, WriteBatch> getWriteBatches(Collection<KeyValue<byte[], byte[]>> collection) {
        Iterator<KeyValue<byte[], byte[]>> it = collection.iterator();
        while (it.hasNext()) {
            this.observedStreamTime = Math.max(this.observedStreamTime, this.keySchema.segmentTimestamp(Bytes.wrap(it.next().key)));
        }
        HashMap hashMap = new HashMap();
        for (KeyValue<byte[], byte[]> keyValue : collection) {
            S orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(this.segments.segmentId(this.keySchema.segmentTimestamp(Bytes.wrap(keyValue.key))), this.context, this.observedStreamTime);
            if (orCreateSegmentIfLive != null) {
                if (!this.bulkLoadSegments.contains(orCreateSegmentIfLive) && isStoreForActiveTask()) {
                    orCreateSegmentIfLive.toggleDbForBulkLoading(true);
                    this.bulkLoadSegments = new HashSet(this.segments.allSegments());
                }
                try {
                    orCreateSegmentIfLive.addToBatch(keyValue, (WriteBatch) hashMap.computeIfAbsent(orCreateSegmentIfLive, segment -> {
                        return new WriteBatch();
                    }));
                } catch (RocksDBException e) {
                    throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
                }
            }
        }
        return hashMap;
    }

    private boolean isStoreForActiveTask() {
        return this.context instanceof ProcessorContextImpl;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void toggleForBulkLoading(boolean z) {
        Iterator<S> it = this.segments.allSegments().iterator();
        while (it.hasNext()) {
            it.next().toggleDbForBulkLoading(z);
        }
    }
}
