/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SimpleTimeZone;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.Segments;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RocksDBSegmentedBytesStoreTest {
    private final long retention = 1000L;
    private final int numSegments = 3;
    private InternalMockProcessorContext context;
    private final String storeName = "bytes-store";
    private RocksDBSegmentedBytesStore bytesStore;
    private File stateDir;
    private long windowSizeForTimeWindow = 500L;
    private final Window[] windows = new Window[4];
    @Parameterized.Parameter
    public SegmentedBytesStore.KeySchema schema;

    @Parameterized.Parameters(name="{0}")
    public static Object[] getKeySchemas() {
        return new Object[]{new SessionKeySchema(), new WindowKeySchema()};
    }

    @Before
    public void before() {
        this.schema.init("topic");
        if (this.schema instanceof SessionKeySchema) {
            this.windows[0] = new SessionWindow(10L, 10L);
            this.windows[1] = new SessionWindow(500L, 1000L);
            this.windows[2] = new SessionWindow(1000L, 1500L);
            this.windows[3] = new SessionWindow(30000L, 60000L);
        }
        if (this.schema instanceof WindowKeySchema) {
            this.windows[0] = WindowKeySchema.timeWindowForSize((long)10L, (long)this.windowSizeForTimeWindow);
            this.windows[1] = WindowKeySchema.timeWindowForSize((long)500L, (long)this.windowSizeForTimeWindow);
            this.windows[2] = WindowKeySchema.timeWindowForSize((long)1000L, (long)this.windowSizeForTimeWindow);
            this.windows[3] = WindowKeySchema.timeWindowForSize((long)60000L, (long)this.windowSizeForTimeWindow);
        }
        this.bytesStore = new RocksDBSegmentedBytesStore("bytes-store", 1000L, 3, this.schema);
        this.stateDir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.stateDir, Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 0L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics())));
        this.bytesStore.init((ProcessorContext)this.context, (StateStore)this.bytesStore);
    }

    @After
    public void close() {
        this.bytesStore.close();
    }

    @Test
    public void shouldPutAndFetch() {
        String key = "a";
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])), this.serializeValue(10L));
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[1])), this.serializeValue(50L));
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[2])), this.serializeValue(100L));
        KeyValueIterator values = this.bytesStore.fetch(Bytes.wrap((byte[])"a".getBytes()), 0L, 500L);
        List<KeyValue> expected = Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", this.windows[0]), (Object)10L), KeyValue.pair((Object)new Windowed((Object)"a", this.windows[1]), (Object)50L));
        Assert.assertEquals(expected, this.toList((KeyValueIterator<Bytes, byte[]>)values));
    }

    @Test
    public void shouldFindValuesWithinRange() {
        String key = "a";
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])), this.serializeValue(10L));
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[1])), this.serializeValue(50L));
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[2])), this.serializeValue(100L));
        KeyValueIterator results = this.bytesStore.fetch(Bytes.wrap((byte[])"a".getBytes()), 1L, 999L);
        List<KeyValue> expected = Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", this.windows[0]), (Object)10L), KeyValue.pair((Object)new Windowed((Object)"a", this.windows[1]), (Object)50L));
        Assert.assertEquals(expected, this.toList((KeyValueIterator<Bytes, byte[]>)results));
    }

    @Test
    public void shouldRemove() {
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])), this.serializeValue(30L));
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[1])), this.serializeValue(50L));
        this.bytesStore.remove(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])));
        KeyValueIterator value = this.bytesStore.fetch(Bytes.wrap((byte[])"a".getBytes()), 0L, 100L);
        Assert.assertFalse((boolean)value.hasNext());
    }

    @Test
    public void shouldRollSegments() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        String key = "a";
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])), this.serializeValue(50L));
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[1])), this.serializeValue(100L));
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[2])), this.serializeValue(500L));
        Assert.assertEquals(Collections.singleton(segments.segmentName(0L)), this.segmentDirs());
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[3])), this.serializeValue(1000L));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{segments.segmentName(0L), segments.segmentName(1L)}), this.segmentDirs());
        List<KeyValue<Windowed<String>, Long>> results = this.toList((KeyValueIterator<Bytes, byte[]>)this.bytesStore.fetch(Bytes.wrap((byte[])"a".getBytes()), 0L, 1500L));
        Assert.assertEquals(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", this.windows[0]), (Object)50L), KeyValue.pair((Object)new Windowed((Object)"a", this.windows[1]), (Object)100L), KeyValue.pair((Object)new Windowed((Object)"a", this.windows[2]), (Object)500L)), results);
    }

    @Test
    public void shouldGetAllSegments() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        String key = "a";
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])), this.serializeValue(50L));
        Assert.assertEquals(Collections.singleton(segments.segmentName(0L)), this.segmentDirs());
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[3])), this.serializeValue(100L));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{segments.segmentName(0L), segments.segmentName(1L)}), this.segmentDirs());
        List<KeyValue<Windowed<String>, Long>> results = this.toList((KeyValueIterator<Bytes, byte[]>)this.bytesStore.all());
        Assert.assertEquals(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", this.windows[0]), (Object)50L), KeyValue.pair((Object)new Windowed((Object)"a", this.windows[3]), (Object)100L)), results);
    }

    @Test
    public void shouldFetchAllSegments() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        String key = "a";
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])), this.serializeValue(50L));
        Assert.assertEquals(Collections.singleton(segments.segmentName(0L)), this.segmentDirs());
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[3])), this.serializeValue(100L));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{segments.segmentName(0L), segments.segmentName(1L)}), this.segmentDirs());
        List<KeyValue<Windowed<String>, Long>> results = this.toList((KeyValueIterator<Bytes, byte[]>)this.bytesStore.fetchAll(0L, 60000L));
        Assert.assertEquals(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", this.windows[0]), (Object)50L), KeyValue.pair((Object)new Windowed((Object)"a", this.windows[3]), (Object)100L)), results);
    }

    @Test
    public void shouldLoadSegementsWithOldStyleDateFormattedName() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        String key = "a";
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])), this.serializeValue(50L));
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[3])), this.serializeValue(100L));
        this.bytesStore.close();
        String firstSegmentName = segments.segmentName(0L);
        String[] nameParts = firstSegmentName.split("\\.");
        Long segmentId = Long.parseLong(nameParts[1]);
        SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
        formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
        String formatted = formatter.format(new Date(segmentId * Segments.segmentInterval((long)1000L, (int)3)));
        File parent = new File(this.stateDir, "bytes-store");
        File oldStyleName = new File(parent, nameParts[0] + "-" + formatted);
        Assert.assertTrue((boolean)new File(parent, firstSegmentName).renameTo(oldStyleName));
        this.bytesStore = new RocksDBSegmentedBytesStore("bytes-store", 1000L, 3, this.schema);
        this.bytesStore.init((ProcessorContext)this.context, (StateStore)this.bytesStore);
        List<KeyValue<Windowed<String>, Long>> results = this.toList((KeyValueIterator<Bytes, byte[]>)this.bytesStore.fetch(Bytes.wrap((byte[])"a".getBytes()), 0L, 60000L));
        MatcherAssert.assertThat(results, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", this.windows[0]), (Object)50L), KeyValue.pair((Object)new Windowed((Object)"a", this.windows[3]), (Object)100L))));
    }

    @Test
    public void shouldLoadSegementsWithOldStyleColonFormattedName() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        String key = "a";
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])), this.serializeValue(50L));
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[3])), this.serializeValue(100L));
        this.bytesStore.close();
        String firstSegmentName = segments.segmentName(0L);
        String[] nameParts = firstSegmentName.split("\\.");
        File parent = new File(this.stateDir, "bytes-store");
        File oldStyleName = new File(parent, nameParts[0] + ":" + Long.parseLong(nameParts[1]));
        Assert.assertTrue((boolean)new File(parent, firstSegmentName).renameTo(oldStyleName));
        this.bytesStore = new RocksDBSegmentedBytesStore("bytes-store", 1000L, 3, this.schema);
        this.bytesStore.init((ProcessorContext)this.context, (StateStore)this.bytesStore);
        List<KeyValue<Windowed<String>, Long>> results = this.toList((KeyValueIterator<Bytes, byte[]>)this.bytesStore.fetch(Bytes.wrap((byte[])"a".getBytes()), 0L, 60000L));
        MatcherAssert.assertThat(results, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", this.windows[0]), (Object)50L), KeyValue.pair((Object)new Windowed((Object)"a", this.windows[3]), (Object)100L))));
    }

    @Test
    public void shouldBeAbleToWriteToReInitializedStore() {
        String key = "a";
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[0])), this.serializeValue(50L));
        this.bytesStore.close();
        this.bytesStore.init((ProcessorContext)this.context, (StateStore)this.bytesStore);
        this.bytesStore.put(this.serializeKey((Windowed<String>)new Windowed((Object)"a", this.windows[1])), this.serializeValue(100L));
    }

    private Set<String> segmentDirs() {
        File windowDir = new File(this.stateDir, "bytes-store");
        return new HashSet<String>(Arrays.asList(windowDir.list()));
    }

    private byte[] serializeValue(long value) {
        return Serdes.Long().serializer().serialize("", (Object)value);
    }

    private Bytes serializeKey(Windowed<String> key) {
        StateSerdes stateSerdes = StateSerdes.withBuiltinTypes((String)"dummy", String.class, Long.class);
        if (this.schema instanceof SessionKeySchema) {
            return Bytes.wrap((byte[])SessionKeySchema.toBinary(key, (Serializer)stateSerdes.keySerializer(), (String)"dummy"));
        }
        return WindowKeySchema.toStoreKeyBinary(key, (int)0, (StateSerdes)stateSerdes);
    }

    private List<KeyValue<Windowed<String>, Long>> toList(KeyValueIterator<Bytes, byte[]> iterator) {
        ArrayList<KeyValue<Windowed<String>, Long>> results = new ArrayList<KeyValue<Windowed<String>, Long>>();
        StateSerdes stateSerdes = StateSerdes.withBuiltinTypes((String)"dummy", String.class, Long.class);
        while (iterator.hasNext()) {
            KeyValue deserialized;
            KeyValue next = (KeyValue)iterator.next();
            if (this.schema instanceof WindowKeySchema) {
                deserialized = KeyValue.pair((Object)WindowKeySchema.fromStoreKey((byte[])((Bytes)next.key).get(), (long)this.windowSizeForTimeWindow, (StateSerdes)stateSerdes), (Object)stateSerdes.valueDeserializer().deserialize("dummy", (byte[])next.value));
                results.add((KeyValue<Windowed<String>, Long>)deserialized);
                continue;
            }
            deserialized = KeyValue.pair((Object)SessionKeySchema.from((byte[])((Bytes)next.key).get(), (Deserializer)stateSerdes.keyDeserializer(), (String)"dummy"), (Object)stateSerdes.valueDeserializer().deserialize("dummy", (byte[])next.value));
            results.add((KeyValue<Windowed<String>, Long>)deserialized);
        }
        return results;
    }
}

