/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamThreadStateStoreProviderTest {
    private StreamTask taskOne;
    private StreamThreadStateStoreProvider provider;
    private StateDirectory stateDirectory;
    private File stateDir;
    private final String topicName = "topic";
    private StreamThread threadMock;
    private Map<TaskId, StreamTask> tasks;

    @Before
    public void before() {
        TopologyWrapper topology = new TopologyWrapper();
        topology.addSource("the-source", new String[]{"topic"});
        topology.addProcessor("the-processor", new MockProcessorSupplier(), new String[]{"the-source"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"kv-store"), (Serde)Serdes.String(), (Serde)Serdes.String()), new String[]{"the-processor"});
        topology.addStateStore(Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)"window-store", (long)10L, (int)2, (long)2L, (boolean)false), (Serde)Serdes.String(), (Serde)Serdes.String()), new String[]{"the-processor"});
        Properties properties = new Properties();
        String applicationId = "applicationId";
        properties.put("application.id", "applicationId");
        properties.put("bootstrap.servers", "localhost:9092");
        this.stateDir = TestUtils.tempDirectory();
        properties.put("state.dir", this.stateDir.getPath());
        StreamsConfig streamsConfig = new StreamsConfig((Map)properties);
        MockClientSupplier clientSupplier = new MockClientSupplier();
        this.configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog");
        this.configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog");
        topology.setApplicationId("applicationId");
        ProcessorTopology processorTopology = topology.getInternalBuilder().build();
        this.tasks = new HashMap<TaskId, StreamTask>();
        this.stateDirectory = new StateDirectory(streamsConfig, (Time)new MockTime());
        this.taskOne = this.createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 0));
        this.taskOne.initializeStateStores();
        this.tasks.put(new TaskId(0, 0), this.taskOne);
        StreamTask taskTwo = this.createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 1));
        taskTwo.initializeStateStores();
        this.tasks.put(new TaskId(0, 1), taskTwo);
        this.threadMock = (StreamThread)EasyMock.createNiceMock(StreamThread.class);
        this.provider = new StreamThreadStateStoreProvider(this.threadMock);
    }

    @After
    public void cleanUp() throws IOException {
        Utils.delete((File)this.stateDir);
    }

    @Test
    public void shouldFindKeyValueStores() {
        this.mockThread(true);
        List kvStores = this.provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
        Assert.assertEquals((long)2L, (long)kvStores.size());
    }

    @Test
    public void shouldFindWindowStores() {
        this.mockThread(true);
        List windowStores = this.provider.stores("window-store", QueryableStoreTypes.windowStore());
        Assert.assertEquals((long)2L, (long)windowStores.size());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() {
        this.mockThread(true);
        this.taskOne.getStore("window-store").close();
        this.provider.stores("window-store", QueryableStoreTypes.windowStore());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
        this.mockThread(true);
        this.taskOne.getStore("kv-store").close();
        this.provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
    }

    @Test
    public void shouldReturnEmptyListIfNoStoresFoundWithName() {
        this.mockThread(true);
        Assert.assertEquals(Collections.emptyList(), (Object)this.provider.stores("not-a-store", QueryableStoreTypes.keyValueStore()));
    }

    @Test
    public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() {
        this.mockThread(true);
        Assert.assertEquals(Collections.emptyList(), (Object)this.provider.stores("window-store", QueryableStoreTypes.keyValueStore()));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() {
        this.mockThread(false);
        this.provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
    }

    private StreamTask createStreamsTask(StreamsConfig streamsConfig, MockClientSupplier clientSupplier, ProcessorTopology topology, TaskId taskId) {
        Metrics metrics = new Metrics();
        return new StreamTask(taskId, Collections.singletonList(new TopicPartition("topic", taskId.partition)), topology, (Consumer)clientSupplier.consumer, (ChangelogReader)new StoreChangelogReader(clientSupplier.restoreConsumer, Duration.ZERO, (StateRestoreListener)new MockStateRestoreListener(), new LogContext("test-stream-task ")), streamsConfig, new MockStreamsMetrics(metrics), this.stateDirectory, null, (Time)new MockTime(), () -> clientSupplier.getProducer(new HashMap<String, Object>())){

            protected void updateOffsetLimits() {
            }
        };
    }

    private void mockThread(boolean initialized) {
        EasyMock.expect((Object)this.threadMock.isRunningAndNotRebalancing()).andReturn((Object)initialized);
        EasyMock.expect((Object)this.threadMock.tasks()).andStubReturn(this.tasks);
        EasyMock.replay((Object[])new Object[]{this.threadMock});
    }

    private void configureRestoreConsumer(MockClientSupplier clientSupplier, String topic) {
        List<PartitionInfo> partitions = Arrays.asList(new PartitionInfo(topic, 0, null, null, null), new PartitionInfo(topic, 1, null, null, null));
        clientSupplier.restoreConsumer.updatePartitions(topic, partitions);
        TopicPartition tp1 = new TopicPartition(topic, 0);
        TopicPartition tp2 = new TopicPartition(topic, 1);
        clientSupplier.restoreConsumer.assign(Arrays.asList(tp1, tp2));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(tp1, 0L);
        offsets.put(tp2, 0L);
        clientSupplier.restoreConsumer.updateBeginningOffsets(offsets);
        clientSupplier.restoreConsumer.updateEndOffsets(offsets);
    }
}

