/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.AbstractSegments;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbstractRocksDBSegmentedBytesStore<S extends Segment>
implements SegmentedBytesStore {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
    private final String name;
    private final AbstractSegments<S> segments;
    private final String metricScope;
    private final SegmentedBytesStore.KeySchema keySchema;
    private InternalProcessorContext context;
    private volatile boolean open;
    private Set<S> bulkLoadSegments;
    private Sensor expiredRecordSensor;
    private long observedStreamTime = -1L;

    AbstractRocksDBSegmentedBytesStore(String name, String metricScope, SegmentedBytesStore.KeySchema keySchema, AbstractSegments<S> segments) {
        this.name = name;
        this.metricScope = metricScope;
        this.keySchema = keySchema;
        this.segments = segments;
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes key, long from, long to) {
        List<S> searchSpace = this.keySchema.segmentsToSearch(this.segments, from, to);
        Bytes binaryFrom = this.keySchema.lowerRangeFixedSize(key, from);
        Bytes binaryTo = this.keySchema.upperRangeFixedSize(key, to);
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(key, key, from, to), binaryFrom, binaryTo);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
        if (keyFrom.compareTo(keyTo) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        List<S> searchSpace = this.keySchema.segmentsToSearch(this.segments, from, to);
        Bytes binaryFrom = this.keySchema.lowerRange(keyFrom, from);
        Bytes binaryTo = this.keySchema.upperRange(keyTo, to);
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(keyFrom, keyTo, from, to), binaryFrom, binaryTo);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> all() {
        List<S> searchSpace = this.segments.allSegments();
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(null, null, 0L, Long.MAX_VALUE), null, null);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetchAll(long timeFrom, long timeTo) {
        List<S> searchSpace = this.segments.segments(timeFrom, timeTo);
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(null, null, timeFrom, timeTo), null, null);
    }

    @Override
    public void remove(Bytes key) {
        long timestamp = this.keySchema.segmentTimestamp(key);
        this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        S segment = this.segments.getSegmentForTimestamp(timestamp);
        if (segment == null) {
            return;
        }
        segment.delete((Bytes)key);
    }

    @Override
    public void put(Bytes key, byte[] value) {
        long timestamp = this.keySchema.segmentTimestamp(key);
        this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        long segmentId = this.segments.segmentId(timestamp);
        S segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
        if (segment == null) {
            this.expiredRecordSensor.record();
            LOG.warn("Skipping record for expired segment.");
        } else {
            segment.put((Bytes)key, (byte[])value);
        }
    }

    @Override
    public byte[] get(Bytes key) {
        S segment = this.segments.getSegmentForTimestamp(this.keySchema.segmentTimestamp(key));
        if (segment == null) {
            return null;
        }
        return (byte[])segment.get((Bytes)key);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.context = (InternalProcessorContext)context;
        StreamsMetricsImpl metrics = this.context.metrics();
        String threadId = Thread.currentThread().getName();
        String taskName = context.taskId().toString();
        this.expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(threadId, taskName, this.metricScope, this.name(), metrics);
        this.segments.openExisting(this.context, this.observedStreamTime);
        this.bulkLoadSegments = new HashSet<S>(this.segments.allSegments());
        context.register(root, new RocksDBSegmentsBatchingRestoreCallback());
        this.open = true;
    }

    @Override
    public void flush() {
        this.segments.flush();
    }

    @Override
    public void close() {
        this.open = false;
        this.segments.close();
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    List<S> getSegments() {
        return this.segments.allSegments();
    }

    void restoreAllInternal(Collection<KeyValue<byte[], byte[]>> records) {
        try {
            Map<S, WriteBatch> writeBatchMap = this.getWriteBatches(records);
            for (Map.Entry<S, WriteBatch> entry : writeBatchMap.entrySet()) {
                Segment segment = (Segment)entry.getKey();
                WriteBatch batch = entry.getValue();
                segment.write(batch);
                batch.close();
            }
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
        }
    }

    Map<S, WriteBatch> getWriteBatches(Collection<KeyValue<byte[], byte[]>> records) {
        for (KeyValue<byte[], byte[]> record : records) {
            long timestamp = this.keySchema.segmentTimestamp(Bytes.wrap((byte[])((byte[])record.key)));
            this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        }
        HashMap<Segment, WriteBatch> writeBatchMap = new HashMap<Segment, WriteBatch>();
        for (KeyValue<byte[], byte[]> record : records) {
            long timestamp = this.keySchema.segmentTimestamp(Bytes.wrap((byte[])((byte[])record.key)));
            long segmentId = this.segments.segmentId(timestamp);
            S segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
            if (segment == null) continue;
            if (!this.bulkLoadSegments.contains(segment) && this.isStoreForActiveTask()) {
                segment.toggleDbForBulkLoading(true);
                this.bulkLoadSegments = new HashSet<S>(this.segments.allSegments());
            }
            try {
                WriteBatch batch = writeBatchMap.computeIfAbsent((Segment)segment, s -> new WriteBatch());
                segment.addToBatch(record, batch);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
            }
        }
        return writeBatchMap;
    }

    private boolean isStoreForActiveTask() {
        return this.context instanceof ProcessorContextImpl;
    }

    private void toggleForBulkLoading(boolean prepareForBulkload) {
        for (Segment segment : this.segments.allSegments()) {
            segment.toggleDbForBulkLoading(prepareForBulkload);
        }
    }

    private class RocksDBSegmentsBatchingRestoreCallback
    extends AbstractNotifyingBatchingRestoreCallback {
        private RocksDBSegmentsBatchingRestoreCallback() {
        }

        @Override
        public void restoreAll(Collection<KeyValue<byte[], byte[]>> records) {
            AbstractRocksDBSegmentedBytesStore.this.restoreAllInternal(records);
        }

        @Override
        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            AbstractRocksDBSegmentedBytesStore.this.toggleForBulkLoading(true);
        }

        @Override
        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
            AbstractRocksDBSegmentedBytesStore.this.toggleForBulkLoading(false);
        }
    }
}

