/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore;
import org.apache.kafka.streams.state.internals.Segment;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LogicalKeyValueSegment
implements Comparable<LogicalKeyValueSegment>,
Segment,
RocksDBVersionedStore.VersionedStoreSegment {
    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
    private final long id;
    private final String name;
    private final RocksDBStore physicalStore;
    private final PrefixKeyFormatter prefixKeyFormatter;
    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet());

    LogicalKeyValueSegment(long id, String name, RocksDBStore physicalStore) {
        this.id = id;
        this.name = name;
        this.physicalStore = Objects.requireNonNull(physicalStore);
        this.prefixKeyFormatter = new PrefixKeyFormatter(LogicalKeyValueSegment.serializeLongToBytes(id));
    }

    @Override
    public long id() {
        return this.id;
    }

    @Override
    public int compareTo(LogicalKeyValueSegment segment) {
        return Long.compare(this.id, segment.id);
    }

    @Override
    public synchronized void destroy() {
        if (this.id < 0L) {
            throw new IllegalStateException("Negative segment ID indicates a reserved segment, which should not be destroyed. Reserved segments are cleaned up only when an entire store is closed, via the close() method rather than destroy().");
        }
        Bytes keyPrefix = this.prefixKeyFormatter.getPrefix();
        this.physicalStore.deleteRange(keyPrefix, keyPrefix);
    }

    @Override
    public synchronized void deleteRange(Bytes keyFrom, Bytes keyTo) {
        this.physicalStore.deleteRange(this.prefixKeyFormatter.addPrefix(keyFrom), this.prefixKeyFormatter.addPrefix(keyTo));
    }

    @Override
    public synchronized void put(Bytes key, byte[] value) {
        this.physicalStore.put(this.prefixKeyFormatter.addPrefix(key), value);
    }

    @Override
    public synchronized byte[] putIfAbsent(Bytes key, byte[] value) {
        return this.physicalStore.putIfAbsent(this.prefixKeyFormatter.addPrefix(key), value);
    }

    @Override
    public synchronized void putAll(List<KeyValue<Bytes, byte[]>> entries) {
        this.physicalStore.putAll(entries.stream().map(kv -> new KeyValue(this.prefixKeyFormatter.addPrefix((Bytes)kv.key), kv.value)).collect(Collectors.toList()));
    }

    @Override
    public synchronized byte[] delete(Bytes key) {
        return this.physicalStore.delete(this.prefixKeyFormatter.addPrefix(key));
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    @Deprecated
    public void init(ProcessorContext context, StateStore root) {
        throw new UnsupportedOperationException("cannot initialize a logical segment");
    }

    @Override
    public void flush() {
        throw new UnsupportedOperationException("nothing to flush for logical segment");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void close() {
        HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
        Set<KeyValueIterator<Bytes, byte[]>> set = this.openIterators;
        synchronized (set) {
            iterators = new HashSet<KeyValueIterator<Bytes, byte[]>>(this.openIterators);
            this.openIterators.clear();
        }
        if (iterators.size() != 0) {
            log.warn("Closing {} open iterators for store {}", (Object)iterators.size(), (Object)this.name);
            for (KeyValueIterator keyValueIterator : iterators) {
                keyValueIterator.close();
            }
        }
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return true;
    }

    @Override
    public synchronized byte[] get(Bytes key) {
        return this.physicalStore.get(this.prefixKeyFormatter.addPrefix(key));
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to) {
        Bytes fromBound = from == null ? this.prefixKeyFormatter.getPrefix() : this.prefixKeyFormatter.addPrefix(from);
        Bytes toBound = to == null ? RocksDBStore.incrementWithoutOverflow(this.prefixKeyFormatter.getPrefix()) : this.prefixKeyFormatter.addPrefix(to);
        KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = this.physicalStore.range(fromBound, toBound, this.openIterators);
        return new StrippedPrefixKeyValueIteratorAdapter(iteratorWithKeyPrefixes, this.prefixKeyFormatter::removePrefix, this.prefixKeyFormatter::startsWithPrefix);
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> all() {
        KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = this.physicalStore.prefixScan(this.prefixKeyFormatter.getPrefix(), new BytesSerializer(), this.openIterators);
        return new StrippedPrefixKeyValueIteratorAdapter(iteratorWithKeyPrefixes, this.prefixKeyFormatter::removePrefix);
    }

    @Override
    public long approximateNumEntries() {
        throw new UnsupportedOperationException("Cannot estimate num entries for logical segment");
    }

    @Override
    public void addToBatch(KeyValue<byte[], byte[]> record, WriteBatch batch) throws RocksDBException {
        this.physicalStore.addToBatch(new KeyValue<byte[], byte[]>(this.prefixKeyFormatter.addPrefix((byte[])record.key), (byte[])record.value), batch);
    }

    @Override
    public void write(WriteBatch batch) throws RocksDBException {
        this.physicalStore.write(batch);
    }

    private static byte[] serializeLongToBytes(long l) {
        return ByteBuffer.allocate(8).putLong(l).array();
    }

    private static class StrippedPrefixKeyValueIteratorAdapter
    implements KeyValueIterator<Bytes, byte[]> {
        private final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes;
        private final Function<Bytes, Bytes> prefixRemover;
        private final Function<Bytes, Boolean> prefixChecker;

        StrippedPrefixKeyValueIteratorAdapter(KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes, Function<Bytes, Bytes> prefixRemover) {
            this(iteratorWithKeyPrefixes, prefixRemover, bytes -> true);
        }

        StrippedPrefixKeyValueIteratorAdapter(KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes, Function<Bytes, Bytes> prefixRemover, Function<Bytes, Boolean> prefixChecker) {
            this.iteratorWithKeyPrefixes = iteratorWithKeyPrefixes;
            this.prefixRemover = prefixRemover;
            this.prefixChecker = prefixChecker;
            this.pruneNonPrefixedElements();
        }

        @Override
        public boolean hasNext() {
            return this.iteratorWithKeyPrefixes.hasNext();
        }

        @Override
        public KeyValue<Bytes, byte[]> next() {
            KeyValue nextWithKeyPrefix = (KeyValue)this.iteratorWithKeyPrefixes.next();
            KeyValue<Bytes, byte[]> next = new KeyValue<Bytes, byte[]>(this.prefixRemover.apply((Bytes)nextWithKeyPrefix.key), (byte[])nextWithKeyPrefix.value);
            this.pruneNonPrefixedElements();
            return next;
        }

        @Override
        public Bytes peekNextKey() {
            return this.prefixRemover.apply(this.iteratorWithKeyPrefixes.peekNextKey());
        }

        @Override
        public void remove() {
            this.iteratorWithKeyPrefixes.remove();
        }

        @Override
        public void close() {
            this.iteratorWithKeyPrefixes.close();
        }

        private void pruneNonPrefixedElements() {
            while (this.iteratorWithKeyPrefixes.hasNext() && !this.prefixChecker.apply(this.iteratorWithKeyPrefixes.peekNextKey()).booleanValue()) {
                this.iteratorWithKeyPrefixes.next();
            }
        }
    }

    private static class PrefixKeyFormatter {
        private final byte[] prefix;

        PrefixKeyFormatter(byte[] prefix) {
            this.prefix = prefix;
        }

        Bytes addPrefix(Bytes key) {
            return Bytes.wrap((byte[])this.addPrefix(key.get()));
        }

        byte[] addPrefix(byte[] key) {
            byte[] keyWithPrefix = new byte[this.prefix.length + key.length];
            System.arraycopy(this.prefix, 0, keyWithPrefix, 0, this.prefix.length);
            System.arraycopy(key, 0, keyWithPrefix, this.prefix.length, key.length);
            return keyWithPrefix;
        }

        Bytes removePrefix(Bytes keyWithPrefix) {
            return Bytes.wrap((byte[])this.removePrefix(keyWithPrefix.get()));
        }

        private byte[] removePrefix(byte[] keyWithPrefix) {
            int rawKeyLength = keyWithPrefix.length - this.prefix.length;
            byte[] rawKey = new byte[rawKeyLength];
            System.arraycopy(keyWithPrefix, this.prefix.length, rawKey, 0, rawKeyLength);
            return rawKey;
        }

        Bytes getPrefix() {
            return Bytes.wrap((byte[])this.prefix);
        }

        boolean startsWithPrefix(Bytes maybePrefixed) {
            if (maybePrefixed.get().length < this.prefix.length) {
                return false;
            }
            byte[] maybePrefix = new byte[this.prefix.length];
            System.arraycopy(maybePrefixed.get(), 0, maybePrefix, 0, this.prefix.length);
            return Arrays.equals(this.prefix, maybePrefix);
        }
    }
}

