package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBStore.class */
public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
    private static final int TTL_NOT_USED = -1;
    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
    private static final long WRITE_BUFFER_SIZE = 16777216;
    private static final long BLOCK_CACHE_SIZE = 52428800;
    private static final long BLOCK_SIZE = 4096;
    private static final int TTL_SECONDS = -1;
    private static final int MAX_WRITE_BUFFERS = 3;
    private static final String DB_FILE_DIR = "rocksdb";
    private final String name;
    private final String parentDir;
    private final Set<KeyValueIterator> openIterators;
    File dbDir;
    private RocksDB db;
    private Options options;
    private WriteOptions wOptions;
    private FlushOptions fOptions;
    private volatile boolean prepareForBulkload;
    private ProcessorContext internalProcessorContext;
    volatile BatchingStateRestoreCallback batchingStateRestoreCallback;
    protected volatile boolean open;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBStore$RocksDBBatchingRestoreCallback.class */
    static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback {
        private final RocksDBStore rocksDBStore;

        RocksDBBatchingRestoreCallback(RocksDBStore rocksDBStore) {
            this.rocksDBStore = rocksDBStore;
        }

        @Override // org.apache.kafka.streams.processor.BatchingStateRestoreCallback
        public void restoreAll(Collection<KeyValue<byte[], byte[]>> collection) {
            this.rocksDBStore.restoreAllInternal(collection);
        }

        @Override // org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback, org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            this.rocksDBStore.toggleDbForBulkLoading(true);
        }

        @Override // org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback, org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
            this.rocksDBStore.toggleDbForBulkLoading(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBStore$RocksDBRangeIterator.class */
    public class RocksDBRangeIterator extends RocksDbIterator {
        private final Comparator<byte[]> comparator;
        private final byte[] rawToKey;

        RocksDBRangeIterator(String str, RocksIterator rocksIterator, Bytes bytes, Bytes bytes2) {
            super(str, rocksIterator);
            this.comparator = Bytes.BYTES_LEXICO_COMPARATOR;
            rocksIterator.seek(bytes.get());
            this.rawToKey = bytes2.get();
            if (this.rawToKey == null) {
                throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + bytes2);
            }
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore.RocksDbIterator, java.util.Iterator
        public synchronized boolean hasNext() {
            return super.hasNext() && this.comparator.compare(super.peekRawKey(), this.rawToKey) <= 0;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBStore$RocksDbIterator.class */
    private class RocksDbIterator implements KeyValueIterator<Bytes, byte[]> {
        private final String storeName;
        private final RocksIterator iter;
        private volatile boolean open = true;

        RocksDbIterator(String str, RocksIterator rocksIterator) {
            this.iter = rocksIterator;
            this.storeName = str;
        }

        byte[] peekRawKey() {
            return this.iter.key();
        }

        private KeyValue<Bytes, byte[]> getKeyValue() {
            return new KeyValue<>(new Bytes(this.iter.key()), this.iter.value());
        }

        @Override // java.util.Iterator
        public synchronized boolean hasNext() {
            if (this.open) {
                return this.iter.isValid();
            }
            throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", this.storeName));
        }

        @Override // java.util.Iterator
        public synchronized KeyValue<Bytes, byte[]> next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Bytes, byte[]> keyValue = getKeyValue();
            this.iter.next();
            return keyValue;
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("RocksDB iterator does not support remove()");
        }

        @Override // org.apache.kafka.streams.state.KeyValueIterator, java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() {
            RocksDBStore.this.openIterators.remove(this);
            this.iter.close();
            this.open = false;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.streams.state.KeyValueIterator
        public Bytes peekNextKey() {
            if (hasNext()) {
                return new Bytes(this.iter.key());
            }
            throw new NoSuchElementException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBStore(String str) {
        this(str, DB_FILE_DIR);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBStore(String str, String str2) {
        this.openIterators = Collections.synchronizedSet(new HashSet());
        this.prepareForBulkload = false;
        this.batchingStateRestoreCallback = null;
        this.open = false;
        this.name = str;
        this.parentDir = str2;
    }

    public void openDB(ProcessorContext processorContext) {
        BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig();
        blockBasedTableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
        blockBasedTableConfig.setBlockSize(BLOCK_SIZE);
        this.options = new Options();
        this.options.setTableFormatConfig(blockBasedTableConfig);
        this.options.setWriteBufferSize(WRITE_BUFFER_SIZE);
        this.options.setCompressionType(COMPRESSION_TYPE);
        this.options.setCompactionStyle(COMPACTION_STYLE);
        this.options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
        this.options.setCreateIfMissing(true);
        this.options.setErrorIfExists(false);
        this.options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
        this.options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));
        if (this.prepareForBulkload) {
            this.options.prepareForBulkLoad();
        }
        this.wOptions = new WriteOptions();
        this.wOptions.setDisableWAL(true);
        this.fOptions = new FlushOptions();
        this.fOptions.setWaitForFlush(true);
        Map<String, Object> appConfigs = processorContext.appConfigs();
        Class cls = (Class) appConfigs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
        if (cls != null) {
            ((RocksDBConfigSetter) Utils.newInstance(cls)).setConfig(this.name, this.options, appConfigs);
        }
        this.dbDir = new File(new File(processorContext.stateDir(), this.parentDir), this.name);
        try {
            this.db = openDB(this.dbDir, this.options, -1);
            this.open = true;
        } catch (IOException e) {
            throw new ProcessorStateException(e);
        }
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.internalProcessorContext = processorContext;
        openDB(processorContext);
        this.batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
        processorContext.register(stateStore, false, this.batchingStateRestoreCallback);
    }

    private RocksDB openDB(File file, Options options, int i) throws IOException {
        try {
            if (i != -1) {
                throw new UnsupportedOperationException("Change log is not supported for store " + this.name + " since it is TTL based.");
            }
            Files.createDirectories(file.getParentFile().toPath(), new FileAttribute[0]);
            return RocksDB.open(options, file.getAbsolutePath());
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error opening store " + this.name + " at location " + file.toString(), e);
        }
    }

    boolean isPrepareForBulkload() {
        return this.prepareForBulkload;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return true;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.open;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized byte[] get(Bytes bytes) {
        validateStoreOpen();
        return getInternal(bytes.get());
    }

    private void validateStoreOpen() {
        if (!this.open) {
            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
        }
    }

    private byte[] getInternal(byte[] bArr) {
        try {
            return this.db.get(bArr);
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void toggleDbForBulkLoading(boolean z) {
        String[] list;
        if (z && (list = this.dbDir.list(new FilenameFilter() { // from class: org.apache.kafka.streams.state.internals.RocksDBStore.1
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str) {
                return str.matches(".*\\.sst");
            }
        })) != null && list.length > 0) {
            try {
                this.db.compactRange(true, 1, 0);
                close();
                openDB(this.internalProcessorContext);
            } catch (RocksDBException e) {
                throw new ProcessorStateException("Error while range compacting during restoring  store " + this.name, e);
            }
        }
        close();
        this.prepareForBulkload = z;
        openDB(this.internalProcessorContext);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized void put(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        putInternal(bytes.get(), bArr);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        byte[] bArr2 = get(bytes);
        if (bArr2 == null) {
            put(bytes, bArr);
        }
        return bArr2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void restoreAllInternal(Collection<KeyValue<byte[], byte[]>> collection) {
        try {
            WriteBatch writeBatch = new WriteBatch();
            Throwable th = null;
            try {
                try {
                    for (KeyValue<byte[], byte[]> keyValue : collection) {
                        if (keyValue.value == null) {
                            writeBatch.remove(keyValue.key);
                        } else {
                            writeBatch.put(keyValue.key, keyValue.value);
                        }
                    }
                    this.db.write(this.wOptions, writeBatch);
                    if (writeBatch != null) {
                        if (0 != 0) {
                            try {
                                writeBatch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            writeBatch.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
        }
    }

    private void putInternal(byte[] bArr, byte[] bArr2) {
        if (bArr2 == null) {
            try {
                this.db.delete(this.wOptions, bArr);
            } catch (RocksDBException e) {
                throw new ProcessorStateException("Error while removing key from store " + this.name, e);
            }
        } else {
            try {
                this.db.put(this.wOptions, bArr, bArr2);
            } catch (RocksDBException e2) {
                throw new ProcessorStateException("Error while executing putting key/value into store " + this.name, e2);
            }
        }
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
        try {
            WriteBatch writeBatch = new WriteBatch();
            Throwable th = null;
            try {
                try {
                    for (KeyValue<Bytes, byte[]> keyValue : list) {
                        Objects.requireNonNull(keyValue.key, "key cannot be null");
                        if (keyValue.value == null) {
                            writeBatch.remove(keyValue.key.get());
                        } else {
                            writeBatch.put(keyValue.key.get(), keyValue.value);
                        }
                    }
                    this.db.write(this.wOptions, writeBatch);
                    if (writeBatch != null) {
                        if (0 != 0) {
                            try {
                                writeBatch.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            writeBatch.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
        }
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized byte[] delete(Bytes bytes) {
        Objects.requireNonNull(bytes, "key cannot be null");
        byte[] bArr = get(bytes);
        put(bytes, (byte[]) null);
        return bArr;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        Objects.requireNonNull(bytes, "from cannot be null");
        Objects.requireNonNull(bytes2, "to cannot be null");
        validateStoreOpen();
        RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(this.name, this.db.newIterator(), bytes, bytes2);
        this.openIterators.add(rocksDBRangeIterator);
        return rocksDBRangeIterator;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized KeyValueIterator<Bytes, byte[]> all() {
        validateStoreOpen();
        RocksIterator newIterator = this.db.newIterator();
        newIterator.seekToFirst();
        RocksDbIterator rocksDbIterator = new RocksDbIterator(this.name, newIterator);
        this.openIterators.add(rocksDbIterator);
        return rocksDbIterator;
    }

    public synchronized KeyValue<Bytes, byte[]> first() {
        validateStoreOpen();
        RocksIterator newIterator = this.db.newIterator();
        newIterator.seekToFirst();
        KeyValue<Bytes, byte[]> keyValue = new KeyValue<>(new Bytes(newIterator.key()), newIterator.value());
        newIterator.close();
        return keyValue;
    }

    public synchronized KeyValue<Bytes, byte[]> last() {
        validateStoreOpen();
        RocksIterator newIterator = this.db.newIterator();
        newIterator.seekToLast();
        KeyValue<Bytes, byte[]> keyValue = new KeyValue<>(new Bytes(newIterator.key()), newIterator.value());
        newIterator.close();
        return keyValue;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public long approximateNumEntries() {
        validateStoreOpen();
        try {
            long longProperty = this.db.getLongProperty("rocksdb.estimate-num-keys");
            if (isOverflowing(longProperty)) {
                return Long.MAX_VALUE;
            }
            return longProperty;
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error fetching property from store " + this.name, e);
        }
    }

    private boolean isOverflowing(long j) {
        return j < 0;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public synchronized void flush() {
        if (this.db == null) {
            return;
        }
        flushInternal();
    }

    private void flushInternal() {
        try {
            this.db.flush(this.fOptions);
        } catch (RocksDBException e) {
            throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
        }
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public synchronized void close() {
        if (this.open) {
            this.open = false;
            closeOpenIterators();
            this.options.close();
            this.wOptions.close();
            this.fOptions.close();
            this.db.close();
            this.options = null;
            this.wOptions = null;
            this.fOptions = null;
            this.db = null;
        }
    }

    private void closeOpenIterators() {
        HashSet hashSet;
        synchronized (this.openIterators) {
            hashSet = new HashSet(this.openIterators);
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            ((KeyValueIterator) it.next()).close();
        }
    }
}
