package org.apache.kafka.streams.integration;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.io.StringReader;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.utils.MockTime;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsTest;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/QueryableStateIntegrationTest.class */
public class QueryableStateIntegrationTest {
    private static final long DEFAULT_TIMEOUT_MS = 120000;
    private static final int NUM_BROKERS = 1;
    private static final int STREAM_THREE_PARTITIONS = 4;
    private static final int STREAM_TWO_PARTITIONS = 2;
    private static final int NUM_REPLICAS = 1;
    private Properties streamsConfiguration;
    private List<String> inputValues;
    private Set<String> inputValuesKeys;
    private KafkaStreams kafkaStreams;
    private Comparator<KeyValue<String, String>> stringComparator;
    private Comparator<KeyValue<String, Long>> stringLongComparator;
    private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class);

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS);
    private final MockTime mockTime = CLUSTER.time;
    private String streamOne = "stream-one";
    private String streamTwo = "stream-two";
    private String streamThree = "stream-three";
    private String streamConcurrent = "stream-concurrent";
    private String outputTopic = "output";
    private String outputTopicConcurrent = "output-concurrent";
    private String outputTopicConcurrentWindowed = "output-concurrent-windowed";
    private String outputTopicThree = "output-three";

    @Rule
    public TestName testName = new TestName();

    /* loaded from: input_file:org/apache/kafka/streams/integration/QueryableStateIntegrationTest$ProducerRunnable.class */
    private class ProducerRunnable implements Runnable {
        private final String topic;
        private final List<String> inputValues;
        private final int numIterations;
        private int currIteration = 0;
        boolean shutdown = false;

        ProducerRunnable(String str, List<String> list, int i) {
            this.topic = str;
            this.inputValues = list;
            this.numIterations = i;
        }

        private synchronized void incrementIteration() {
            this.currIteration++;
        }

        synchronized int getCurrIteration() {
            return this.currIteration;
        }

        synchronized void shutdown() {
            this.shutdown = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", QueryableStateIntegrationTest.CLUSTER.bootstrapServers());
            properties.put("acks", "all");
            properties.put("key.serializer", StringSerializer.class);
            properties.put("value.serializer", StringSerializer.class);
            KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
            Throwable th = null;
            while (getCurrIteration() < this.numIterations && !this.shutdown) {
                try {
                    try {
                        Iterator<String> it = this.inputValues.iterator();
                        while (it.hasNext()) {
                            kafkaProducer.send(new ProducerRecord(this.topic, it.next()));
                        }
                        incrementIteration();
                    } catch (Throwable th2) {
                        if (kafkaProducer != null) {
                            if (th != null) {
                                try {
                                    kafkaProducer.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                kafkaProducer.close();
                            }
                        }
                        throw th2;
                    }
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            }
            if (kafkaProducer != null) {
                if (0 == 0) {
                    kafkaProducer.close();
                    return;
                }
                try {
                    kafkaProducer.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            }
        }
    }

    private void createTopics() throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.streamOne += "-" + safeUniqueTestName;
        this.streamConcurrent += "-" + safeUniqueTestName;
        this.streamThree += "-" + safeUniqueTestName;
        this.outputTopic += "-" + safeUniqueTestName;
        this.outputTopicConcurrent += "-" + safeUniqueTestName;
        this.outputTopicConcurrentWindowed += "-" + safeUniqueTestName;
        this.outputTopicThree += "-" + safeUniqueTestName;
        this.streamTwo += "-" + safeUniqueTestName;
        CLUSTER.createTopics(this.streamOne, this.streamConcurrent);
        CLUSTER.createTopic(this.streamTwo, STREAM_TWO_PARTITIONS, 1);
        CLUSTER.createTopic(this.streamThree, STREAM_THREE_PARTITIONS, 1);
        CLUSTER.createTopics(this.outputTopic, this.outputTopicConcurrent, this.outputTopicConcurrentWindowed, this.outputTopicThree);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [java.util.List] */
    private List<String> getInputValues() {
        ArrayList arrayList = new ArrayList();
        ClassLoader classLoader = getClass().getClassLoader();
        String str = "QueryableStateIntegrationTest" + File.separator + "inputValues.txt";
        try {
            BufferedReader bufferedReader = new BufferedReader(new FileReader(((URL) Objects.requireNonNull(classLoader.getResource(str))).getFile()));
            Throwable th = null;
            try {
                try {
                    for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                        arrayList.add(readLine);
                    }
                    if (bufferedReader != null) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            log.warn("Unable to read '{}{}{}'. Using default inputValues list", new Object[]{"resources", File.separator, str});
            arrayList = Arrays.asList("hello world", "all streams lead to kafka", "streams", "kafka streams", "the cat in the hat", "green eggs and ham", "that Sam i am", "up the creek without a paddle", "run forest run", "a tank full of gas", "eat sleep rave repeat", "one jolly sailor", "king of the world");
        }
        return arrayList;
    }

    @Before
    public void before() throws Exception {
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("commit.interval.ms", 100);
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.stringComparator = Comparator.comparing(keyValue -> {
            return (String) keyValue.key;
        }).thenComparing(keyValue2 -> {
            return (String) keyValue2.value;
        });
        this.stringLongComparator = Comparator.comparing(keyValue3 -> {
            return (String) keyValue3.key;
        }).thenComparingLong(keyValue4 -> {
            return ((Long) keyValue4.value).longValue();
        });
        this.inputValues = getInputValues();
        this.inputValuesKeys = new HashSet();
        Iterator<String> it = this.inputValues.iterator();
        while (it.hasNext()) {
            Collections.addAll(this.inputValuesKeys, it.next().split("\\W+"));
        }
    }

    @After
    public void shutdown() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
        CLUSTER.deleteAllTopicsAndWait(0L);
    }

    private KafkaStreams createCountStream(String str, String str2, String str3, String str4, String str5, Properties properties) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Serde String = Serdes.String();
        KGroupedStream groupBy = streamsBuilder.stream(str, Consumed.with(String, String)).flatMapValues(str6 -> {
            return Arrays.asList(str6.split("\\W+"));
        }).groupBy(MockMapper.selectValueMapper());
        groupBy.count(Materialized.as(str4 + "-" + str)).toStream().to(str2, Produced.with(Serdes.String(), Serdes.Long()));
        groupBy.windowedBy(TimeWindows.of(Duration.ofMillis(WINDOW_SIZE))).count(Materialized.as(str5 + "-" + str)).toStream((windowed, l) -> {
            return (String) windowed.key();
        }).to(str3, Produced.with(Serdes.String(), Serdes.Long()));
        return new KafkaStreams(streamsBuilder.build(), properties);
    }

    private void verifyOffsetLagFetch(List<KafkaStreams> list, Set<String> set, List<Integer> list2) {
        for (int i = 0; i < list.size(); i++) {
            Map allLocalStorePartitionLags = list.get(i).allLocalStorePartitionLags();
            int intValue = list2.get(i).intValue();
            MatcherAssert.assertThat(Integer.valueOf(allLocalStorePartitionLags.values().stream().mapToInt((v0) -> {
                return v0.size();
            }).sum()), IsEqual.equalTo(Integer.valueOf(intValue)));
            if (intValue > 0) {
                MatcherAssert.assertThat(allLocalStorePartitionLags.keySet(), IsEqual.equalTo(set));
            }
        }
    }

    @Deprecated
    private void verifyAllKVKeys(List<KafkaStreams> list, KafkaStreams kafkaStreams, KafkaStreamsTest.StateListenerStub stateListenerStub, Set<String> set, String str, long j, boolean z) throws Exception {
        TestUtils.retryOnExceptionWithTimeout(j, () -> {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            TreeMap treeMap = new TreeMap();
            StringSerializer stringSerializer = new StringSerializer();
            Iterator it = set.iterator();
            while (it.hasNext()) {
                String str2 = (String) it.next();
                try {
                    KeyQueryMetadata queryMetadataForKey = kafkaStreams.queryMetadataForKey(str, str2, stringSerializer);
                    StreamsMetadata metadataForKey = kafkaStreams.metadataForKey(str, str2, stringSerializer);
                    if (queryMetadataForKey == null || queryMetadataForKey.equals(KeyQueryMetadata.NOT_AVAILABLE)) {
                        arrayList.add(str2);
                    } else {
                        MatcherAssert.assertThat(metadataForKey.hostInfo(), IsEqual.equalTo(queryMetadataForKey.activeHost()));
                        if (!z) {
                            MatcherAssert.assertThat("Should have standbys to query from", !queryMetadataForKey.standbyHosts().isEmpty());
                        }
                        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(str, z ? (KafkaStreams) list.get(queryMetadataForKey.activeHost().port()) : kafkaStreams, true, QueryableStoreTypes.keyValueStore());
                        if (readOnlyKeyValueStore == null) {
                            arrayList2.add(str2);
                        } else if (readOnlyKeyValueStore.get(str2) == null) {
                            arrayList3.add(str2);
                        }
                    }
                } catch (Exception e) {
                    treeMap.put(str2, e);
                } catch (InvalidStateStoreException e2) {
                    if (stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING).longValue() < 1) {
                        throw new NoRetryException(new AssertionError(String.format("Received %s for key %s and expected at least one rebalancing state, but had none", e2.getClass().getName(), str2)));
                    }
                }
            }
            assertNoKVKeyFailures(str, j, arrayList, arrayList2, arrayList3, treeMap);
        });
    }

    @Deprecated
    private void verifyAllWindowedKeys(List<KafkaStreams> list, KafkaStreams kafkaStreams, KafkaStreamsTest.StateListenerStub stateListenerStub, Set<String> set, String str, Long l, Long l2, long j, boolean z) throws Exception {
        TestUtils.retryOnExceptionWithTimeout(j, () -> {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            TreeMap treeMap = new TreeMap();
            StringSerializer stringSerializer = new StringSerializer();
            Iterator it = set.iterator();
            while (it.hasNext()) {
                String str2 = (String) it.next();
                try {
                    KeyQueryMetadata queryMetadataForKey = kafkaStreams.queryMetadataForKey(str, str2, stringSerializer);
                    StreamsMetadata metadataForKey = kafkaStreams.metadataForKey(str, str2, stringSerializer);
                    if (queryMetadataForKey == null || queryMetadataForKey.equals(KeyQueryMetadata.NOT_AVAILABLE)) {
                        arrayList.add(str2);
                    } else {
                        MatcherAssert.assertThat(metadataForKey.hostInfo(), IsEqual.equalTo(queryMetadataForKey.activeHost()));
                        if (z) {
                            MatcherAssert.assertThat(Integer.valueOf(queryMetadataForKey.standbyHosts().size()), IsEqual.equalTo(0));
                        } else {
                            MatcherAssert.assertThat("Should have standbys to query from", !queryMetadataForKey.standbyHosts().isEmpty());
                        }
                        ReadOnlyWindowStore readOnlyWindowStore = (ReadOnlyWindowStore) IntegrationTestUtils.getStore(str, z ? (KafkaStreams) list.get(queryMetadataForKey.activeHost().port()) : kafkaStreams, true, QueryableStoreTypes.windowStore());
                        if (readOnlyWindowStore == null) {
                            arrayList2.add(str2);
                        } else if (readOnlyWindowStore.fetch(str2, Instant.ofEpochMilli(l.longValue()), Instant.ofEpochMilli(l2.longValue())) == null) {
                            arrayList3.add(str2);
                        }
                    }
                } catch (Exception e) {
                    treeMap.put(str2, e);
                } catch (InvalidStateStoreException e2) {
                    if (stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING).longValue() < 1) {
                        throw new NoRetryException(new AssertionError(String.format("Received %s for key %s and expected at least one rebalancing state, but had none", e2.getClass().getName(), str2)));
                    }
                }
            }
            assertNoKVKeyFailures(str, j, arrayList, arrayList2, arrayList3, treeMap);
        });
    }

    private void assertNoKVKeyFailures(String str, long j, List<String> list, List<String> list2, List<String> list3, Map<String, Exception> map) throws IOException {
        StringBuilder sb = new StringBuilder();
        sb.append(String.format("Not all keys are available for store %s in %d ms", str, Long.valueOf(j)));
        if (!list.isEmpty()) {
            sb.append("\n    * No metadata is available for these keys: ").append(list);
        }
        if (!list2.isEmpty()) {
            sb.append("\n    * No store is available for these keys: ").append(list2);
        }
        if (!list3.isEmpty()) {
            sb.append("\n    * No value is available for these keys: ").append(list3);
        }
        if (!map.isEmpty()) {
            sb.append("\n    * Exceptions were raised for the following keys: ");
            for (Map.Entry<String, Exception> entry : map.entrySet()) {
                sb.append(String.format("\n        %s:", entry.getKey()));
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                entry.getValue().printStackTrace(new PrintStream(byteArrayOutputStream));
                BufferedReader bufferedReader = new BufferedReader(new StringReader(byteArrayOutputStream.toString()));
                Throwable th = null;
                try {
                    try {
                        for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                            sb.append("\n            ").append(readLine);
                        }
                        if (bufferedReader != null) {
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (bufferedReader != null) {
                        if (th != null) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    throw th3;
                }
            }
        }
        MatcherAssert.assertThat(sb.toString(), list.isEmpty() && list2.isEmpty() && list3.isEmpty() && map.isEmpty());
    }

    @Test
    public void shouldRejectNonExistentStoreName() throws InterruptedException {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        String str = safeUniqueTestName + "-input";
        String str2 = safeUniqueTestName + "-input-table";
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(str, Materialized.as(str2).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        Properties mkProperties = Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName)), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())}));
        CLUSTER.createTopic(str);
        KafkaStreams runningStreams = IntegrationTestUtils.getRunningStreams(mkProperties, streamsBuilder, true);
        Throwable th = null;
        try {
            try {
                MatcherAssert.assertThat((ReadOnlyKeyValueStore) runningStreams.store(StoreQueryParameters.fromNameAndType(str2, QueryableStoreTypes.keyValueStore())), Matchers.notNullValue());
                MatcherAssert.assertThat(Assert.assertThrows(InvalidStateStoreException.class, () -> {
                }).getMessage(), Matchers.is("Cannot get state store no-table because no such store is registered in the topology."));
                if (runningStreams != null) {
                    if (0 == 0) {
                        runningStreams.close();
                        return;
                    }
                    try {
                        runningStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (runningStreams != null) {
                if (th != null) {
                    try {
                        runningStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    runningStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldRejectWronglyTypedStore() throws InterruptedException {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        String str = safeUniqueTestName + "-input";
        String str2 = safeUniqueTestName + "-input-table";
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(str, Materialized.as(str2).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        CLUSTER.createTopic(str);
        KafkaStreams runningStreams = IntegrationTestUtils.getRunningStreams(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", safeUniqueTestName + "-app"), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())})), streamsBuilder, true);
        Throwable th = null;
        try {
            try {
                MatcherAssert.assertThat((ReadOnlyKeyValueStore) runningStreams.store(StoreQueryParameters.fromNameAndType(str2, QueryableStoreTypes.keyValueStore())), Matchers.notNullValue());
                ReadOnlySessionStore readOnlySessionStore = (ReadOnlySessionStore) runningStreams.store(StoreQueryParameters.fromNameAndType(str2, QueryableStoreTypes.sessionStore()));
                MatcherAssert.assertThat(Assert.assertThrows(InvalidStateStoreException.class, () -> {
                    readOnlySessionStore.fetch("a");
                }).getMessage(), Matchers.is("Cannot get state store " + str2 + " because the queryable store type [class org.apache.kafka.streams.state.QueryableStoreTypes$SessionStoreType] does not accept the actual store type [class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore]."));
                if (runningStreams != null) {
                    if (0 == 0) {
                        runningStreams.close();
                        return;
                    }
                    try {
                        runningStreams.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (runningStreams != null) {
                if (th != null) {
                    try {
                        runningStreams.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    runningStreams.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldBeAbleToQueryDuringRebalance() throws Exception {
        ArrayList arrayList = new ArrayList(STREAM_TWO_PARTITIONS);
        ArrayList arrayList2 = new ArrayList(STREAM_TWO_PARTITIONS);
        new ProducerRunnable(this.streamThree, this.inputValues, 1).run();
        for (int i = 0; i < STREAM_TWO_PARTITIONS; i++) {
            Properties properties = (Properties) this.streamsConfiguration.clone();
            properties.put("state.dir", TestUtils.tempDirectory("shouldBeAbleToQueryDuringRebalance-" + i).getPath());
            properties.put("application.server", "localhost:" + i);
            properties.put("client.id", "instance-" + i);
            KafkaStreams createCountStream = createCountStream(this.streamThree, this.outputTopicThree, this.outputTopicConcurrentWindowed, "word-count-store", "windowed-word-count-store", properties);
            KafkaStreamsTest.StateListenerStub stateListenerStub = new KafkaStreamsTest.StateListenerStub();
            createCountStream.setStateListener(stateListenerStub);
            arrayList2.add(stateListenerStub);
            arrayList.add(createCountStream);
        }
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(arrayList, Duration.ofSeconds(60L));
        Set<String> mkSet = Utils.mkSet(new String[]{"word-count-store-" + this.streamThree, "windowed-word-count-store-" + this.streamThree});
        verifyOffsetLagFetch(arrayList, mkSet, Arrays.asList(Integer.valueOf(STREAM_THREE_PARTITIONS), Integer.valueOf(STREAM_THREE_PARTITIONS)));
        try {
            waitUntilAtLeastNumRecordProcessed(this.outputTopicThree, 1);
            for (int i2 = 0; i2 < arrayList.size(); i2++) {
                verifyAllKVKeys(arrayList, arrayList.get(i2), (KafkaStreamsTest.StateListenerStub) arrayList2.get(i2), this.inputValuesKeys, "word-count-store-" + this.streamThree, DEFAULT_TIMEOUT_MS, true);
                verifyAllWindowedKeys(arrayList, arrayList.get(i2), (KafkaStreamsTest.StateListenerStub) arrayList2.get(i2), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, Long.valueOf(WINDOW_SIZE), DEFAULT_TIMEOUT_MS, true);
            }
            verifyOffsetLagFetch(arrayList, mkSet, Arrays.asList(Integer.valueOf(STREAM_THREE_PARTITIONS), Integer.valueOf(STREAM_THREE_PARTITIONS)));
            for (int i3 = 1; i3 < arrayList.size(); i3++) {
                Duration ofSeconds = Duration.ofSeconds(60L);
                MatcherAssert.assertThat(String.format("Streams instance %s did not close in %d ms", Integer.valueOf(i3), Long.valueOf(ofSeconds.toMillis())), arrayList.get(i3).close(ofSeconds));
            }
            IntegrationTestUtils.waitForApplicationState(arrayList.subList(1, STREAM_TWO_PARTITIONS), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60L));
            verifyOffsetLagFetch(arrayList, mkSet, Arrays.asList(Integer.valueOf(STREAM_THREE_PARTITIONS), 0));
            IntegrationTestUtils.waitForApplicationState(arrayList.subList(0, 1), KafkaStreams.State.RUNNING, Duration.ofSeconds(60L));
            verifyOffsetLagFetch(arrayList, mkSet, Arrays.asList(Integer.valueOf(STREAM_THREE_PARTITIONS), 0));
            verifyAllKVKeys(arrayList, arrayList.get(0), (KafkaStreamsTest.StateListenerStub) arrayList2.get(0), this.inputValuesKeys, "word-count-store-" + this.streamThree, DEFAULT_TIMEOUT_MS, true);
            verifyAllWindowedKeys(arrayList, arrayList.get(0), (KafkaStreamsTest.StateListenerStub) arrayList2.get(0), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, Long.valueOf(WINDOW_SIZE), DEFAULT_TIMEOUT_MS, true);
            TestUtils.retryOnExceptionWithTimeout(DEFAULT_TIMEOUT_MS, () -> {
                verifyOffsetLagFetch(arrayList, mkSet, Arrays.asList(8, 0));
            });
            Iterator<KafkaStreams> it = arrayList.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        } catch (Throwable th) {
            Iterator<KafkaStreams> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
            throw th;
        }
    }

    @Test
    public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception {
        ArrayList arrayList = new ArrayList(STREAM_TWO_PARTITIONS);
        ArrayList arrayList2 = new ArrayList(STREAM_TWO_PARTITIONS);
        new ProducerRunnable(this.streamThree, this.inputValues, 1).run();
        Set<String> mkSet = Utils.mkSet(new String[]{"word-count-store-" + this.streamThree, "windowed-word-count-store-" + this.streamThree});
        for (int i = 0; i < STREAM_TWO_PARTITIONS; i++) {
            Properties properties = (Properties) this.streamsConfiguration.clone();
            properties.put("application.server", "localhost:" + i);
            properties.put("client.id", "instance-" + i);
            properties.put("num.standby.replicas", 1);
            properties.put("state.dir", TestUtils.tempDirectory("shouldBeAbleQueryStandbyStateDuringRebalance-" + i).getPath());
            KafkaStreams createCountStream = createCountStream(this.streamThree, this.outputTopicThree, this.outputTopicConcurrentWindowed, "word-count-store", "windowed-word-count-store", properties);
            KafkaStreamsTest.StateListenerStub stateListenerStub = new KafkaStreamsTest.StateListenerStub();
            createCountStream.setStateListener(stateListenerStub);
            arrayList2.add(stateListenerStub);
            arrayList.add(createCountStream);
        }
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(arrayList, Duration.ofSeconds(60L));
        verifyOffsetLagFetch(arrayList, mkSet, Arrays.asList(8, 8));
        try {
            waitUntilAtLeastNumRecordProcessed(this.outputTopicThree, 1);
            for (int i2 = 0; i2 < arrayList.size(); i2++) {
                verifyAllKVKeys(arrayList, arrayList.get(i2), (KafkaStreamsTest.StateListenerStub) arrayList2.get(i2), this.inputValuesKeys, "word-count-store-" + this.streamThree, DEFAULT_TIMEOUT_MS, false);
                verifyAllWindowedKeys(arrayList, arrayList.get(i2), (KafkaStreamsTest.StateListenerStub) arrayList2.get(i2), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, Long.valueOf(WINDOW_SIZE), DEFAULT_TIMEOUT_MS, false);
            }
            verifyOffsetLagFetch(arrayList, mkSet, Arrays.asList(8, 8));
            for (int i3 = 1; i3 < arrayList.size(); i3++) {
                Duration ofSeconds = Duration.ofSeconds(60L);
                MatcherAssert.assertThat(String.format("Streams instance %s did not close in %d ms", Integer.valueOf(i3), Long.valueOf(ofSeconds.toMillis())), arrayList.get(i3).close(ofSeconds));
            }
            IntegrationTestUtils.waitForApplicationState(arrayList.subList(1, STREAM_TWO_PARTITIONS), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60L));
            verifyOffsetLagFetch(arrayList, mkSet, Arrays.asList(8, 0));
            verifyAllKVKeys(arrayList, arrayList.get(0), (KafkaStreamsTest.StateListenerStub) arrayList2.get(0), this.inputValuesKeys, "word-count-store-" + this.streamThree, DEFAULT_TIMEOUT_MS, false);
            verifyAllWindowedKeys(arrayList, arrayList.get(0), (KafkaStreamsTest.StateListenerStub) arrayList2.get(0), this.inputValuesKeys, "windowed-word-count-store-" + this.streamThree, 0L, Long.valueOf(WINDOW_SIZE), DEFAULT_TIMEOUT_MS, false);
            TestUtils.retryOnExceptionWithTimeout(DEFAULT_TIMEOUT_MS, () -> {
                verifyOffsetLagFetch(arrayList, mkSet, Arrays.asList(8, 0));
            });
            Iterator<KafkaStreams> it = arrayList.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        } catch (Throwable th) {
            Iterator<KafkaStreams> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
            throw th;
        }
    }

    @Test
    public void shouldBeAbleToQueryStateWithZeroSizedCache() throws Exception {
        verifyCanQueryState(0);
    }

    @Test
    public void shouldBeAbleToQueryStateWithNonZeroSizedCache() throws Exception {
        verifyCanQueryState(10485760);
    }

    @Test
    public void shouldBeAbleToQueryFilterState() throws Exception {
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.Long().getClass());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        String[] strArr = {"hello", "goodbye", "welcome", "go", "kafka"};
        HashSet<KeyValue> hashSet = new HashSet(Arrays.asList(new KeyValue(strArr[0], 1L), new KeyValue(strArr[1], 1L), new KeyValue(strArr[STREAM_TWO_PARTITIONS], 3L), new KeyValue(strArr[3], 5L), new KeyValue(strArr[STREAM_THREE_PARTITIONS], 2L)));
        HashSet<KeyValue> hashSet2 = new HashSet(Collections.singleton(new KeyValue(strArr[STREAM_THREE_PARTITIONS], 2L)));
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, hashSet, TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, new Properties()), this.mockTime);
        Predicate predicate = (str, l) -> {
            return str.contains("kafka");
        };
        KTable table = streamsBuilder.table(this.streamOne);
        KTable filter = table.filter(predicate, Materialized.as("queryFilter"));
        table.filterNot(predicate, Materialized.as("queryFilterNot"));
        filter.toStream().to(this.outputTopic);
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        waitUntilAtLeastNumRecordProcessed(this.outputTopic, 1);
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("queryFilter", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("queryFilterNot", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        for (KeyValue keyValue : hashSet2) {
            TestUtils.waitForCondition(() -> {
                return ((Long) keyValue.value).equals(readOnlyKeyValueStore.get(keyValue.key));
            }, "Cannot get expected result");
        }
        for (KeyValue keyValue2 : hashSet) {
            if (!hashSet2.contains(keyValue2)) {
                TestUtils.waitForCondition(() -> {
                    return readOnlyKeyValueStore.get(keyValue2.key) == null;
                }, "Cannot get null result");
            }
        }
        for (KeyValue keyValue3 : hashSet2) {
            TestUtils.waitForCondition(() -> {
                return readOnlyKeyValueStore2.get(keyValue3.key) == null;
            }, "Cannot get null result");
        }
        for (KeyValue keyValue4 : hashSet) {
            if (!hashSet2.contains(keyValue4)) {
                TestUtils.waitForCondition(() -> {
                    return ((Long) keyValue4.value).equals(readOnlyKeyValueStore2.get(keyValue4.key));
                }, "Cannot get expected result");
            }
        }
    }

    @Test
    public void shouldBeAbleToQueryMapValuesState() throws Exception {
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        String[] strArr = {"hello", "goodbye", "welcome", "go", "kafka"};
        HashSet<KeyValue> hashSet = new HashSet(Arrays.asList(new KeyValue(strArr[0], "1"), new KeyValue(strArr[1], "1"), new KeyValue(strArr[STREAM_TWO_PARTITIONS], "3"), new KeyValue(strArr[3], "5"), new KeyValue(strArr[STREAM_THREE_PARTITIONS], "2")));
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, hashSet, TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
        streamsBuilder.table(this.streamOne).mapValues(Long::valueOf, Materialized.as("queryMapValues").withValueSerde(Serdes.Long())).toStream().to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        waitUntilAtLeastNumRecordProcessed(this.outputTopic, 5);
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("queryMapValues", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        for (KeyValue keyValue : hashSet) {
            Assert.assertEquals(Long.valueOf((String) keyValue.value), readOnlyKeyValueStore.get(keyValue.key));
        }
    }

    @Test
    public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception {
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        String[] strArr = {"hello", "goodbye", "welcome", "go", "kafka"};
        HashSet<KeyValue> hashSet = new HashSet(Arrays.asList(new KeyValue(strArr[0], "1"), new KeyValue(strArr[1], "1"), new KeyValue(strArr[STREAM_TWO_PARTITIONS], "3"), new KeyValue(strArr[3], "5"), new KeyValue(strArr[STREAM_THREE_PARTITIONS], "2")));
        HashSet<KeyValue> hashSet2 = new HashSet(Collections.singleton(new KeyValue(strArr[STREAM_THREE_PARTITIONS], 2L)));
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, hashSet, TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
        streamsBuilder.table(this.streamOne).filter((str, str2) -> {
            return str.contains("kafka");
        }, Materialized.as("queryFilter")).mapValues(Long::valueOf, Materialized.as("queryMapValues").withValueSerde(Serdes.Long())).toStream().to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        waitUntilAtLeastNumRecordProcessed(this.outputTopic, 1);
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("queryMapValues", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        for (KeyValue keyValue : hashSet2) {
            Assert.assertEquals(keyValue.value, readOnlyKeyValueStore.get(keyValue.key));
        }
        for (KeyValue keyValue2 : hashSet) {
            if (!hashSet2.contains(new KeyValue(keyValue2.key, Long.valueOf((String) keyValue2.value)))) {
                Assert.assertNull(readOnlyKeyValueStore.get(keyValue2.key));
            }
        }
    }

    private void verifyCanQueryState(int i) throws Exception {
        this.streamsConfiguration.put("cache.max.bytes.buffering", Integer.valueOf(i));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        String[] strArr = {"hello", "goodbye", "welcome", "go", "kafka"};
        TreeSet treeSet = new TreeSet(this.stringComparator);
        treeSet.addAll(Arrays.asList(new KeyValue(strArr[0], "hello"), new KeyValue(strArr[1], "goodbye"), new KeyValue(strArr[STREAM_TWO_PARTITIONS], "welcome"), new KeyValue(strArr[3], "go"), new KeyValue(strArr[STREAM_THREE_PARTITIONS], "kafka")));
        TreeSet treeSet2 = new TreeSet(this.stringLongComparator);
        for (String str : strArr) {
            treeSet2.add(new KeyValue<>(str, 1L));
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, treeSet, TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
        KStream stream = streamsBuilder.stream(this.streamOne);
        stream.groupByKey().count(Materialized.as("my-count")).toStream().to(this.outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
        stream.groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(WINDOW_SIZE))).count(Materialized.as("windowed-count"));
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        waitUntilAtLeastNumRecordProcessed(this.outputTopic, 1);
        ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("my-count", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        verifyCanGetByKey(strArr, treeSet2, treeSet2, (ReadOnlyWindowStore) IntegrationTestUtils.getStore("windowed-count", this.kafkaStreams, QueryableStoreTypes.windowStore()), readOnlyKeyValueStore);
        verifyRangeAndAll(treeSet2, readOnlyKeyValueStore);
    }

    @Test
    public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.streamThree).groupByKey().count(Materialized.as("count-by-key"));
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams);
        KeyValue pair = KeyValue.pair("hello", "hello");
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamThree, Arrays.asList(pair, pair, pair, pair, pair, pair, pair, pair), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("count-by-key", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition(() -> {
            Long l = 8L;
            return l.equals(readOnlyKeyValueStore.get("hello"));
        }, 30000L, "wait for count to be 8");
        this.kafkaStreams.close();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams, 30000L);
        TestUtils.waitForCondition(() -> {
            try {
                Assert.assertEquals(8L, ((ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("count-by-key", this.kafkaStreams, QueryableStoreTypes.keyValueStore())).get("hello"));
                return true;
            } catch (InvalidStateStoreException e) {
                return false;
            }
        }, 30000L, "waiting for store count-by-key");
    }

    @Test
    @Deprecated
    public void shouldAllowToQueryAfterThreadDied() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.streamOne).groupByKey().reduce((str, str2) -> {
            if (str.length() <= 1 || !atomicBoolean.compareAndSet(true, false)) {
                return str + str2;
            }
            throw new RuntimeException("Injected test exception");
        }, Materialized.as("store")).toStream().to(this.outputTopic);
        this.streamsConfiguration.put("num.stream.threads", Integer.valueOf(STREAM_TWO_PARTITIONS));
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        this.kafkaStreams.setUncaughtExceptionHandler((thread, th) -> {
            atomicBoolean2.set(true);
        });
        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(this.kafkaStreams, 30000L);
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, Arrays.asList(KeyValue.pair("a", "1"), KeyValue.pair("a", "2"), KeyValue.pair("b", "3"), KeyValue.pair("b", "4")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
        ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("store", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        TestUtils.waitForCondition(() -> {
            return "12".equals(readOnlyKeyValueStore.get("a")) && "34".equals(readOnlyKeyValueStore.get("b"));
        }, 30000L, "wait for agg to be <a,12> and <b,34>");
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOne, Collections.singleton(KeyValue.pair("a", "5")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
        atomicBoolean2.getClass();
        TestUtils.waitForCondition(atomicBoolean2::get, 30000L, "wait for thread to fail");
        ReadOnlyKeyValueStore readOnlyKeyValueStore2 = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore("store", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        try {
            TestUtils.waitForCondition(() -> {
                return ("125".equals(readOnlyKeyValueStore2.get("a")) || "1225".equals(readOnlyKeyValueStore2.get("a")) || "12125".equals(readOnlyKeyValueStore2.get("a"))) && ("34".equals(readOnlyKeyValueStore2.get("b")) || "344".equals(readOnlyKeyValueStore2.get("b")) || "3434".equals(readOnlyKeyValueStore2.get("b")));
            }, 30000L, "wait for agg to be <a,125>||<a,1225>||<a,12125> and <b,34>||<b,344>||<b,3434>");
        } catch (Throwable th2) {
            throw new RuntimeException("Store content is a: " + ((String) readOnlyKeyValueStore2.get("a")) + "; b: " + ((String) readOnlyKeyValueStore2.get("b")), th2);
        }
    }

    private void verifyRangeAndAll(Set<KeyValue<String, Long>> set, ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore) {
        TreeSet treeSet = new TreeSet(this.stringLongComparator);
        TreeSet treeSet2 = new TreeSet(this.stringLongComparator);
        TreeSet treeSet3 = new TreeSet(this.stringLongComparator);
        treeSet3.addAll(Arrays.asList(new KeyValue("hello", 1L), new KeyValue("go", 1L), new KeyValue("goodbye", 1L), new KeyValue("kafka", 1L)));
        KeyValueIterator range = readOnlyKeyValueStore.range("go", "kafka");
        Throwable th = null;
        while (range.hasNext()) {
            try {
                try {
                    treeSet.add(range.next());
                } finally {
                }
            } catch (Throwable th2) {
                if (range != null) {
                    if (th != null) {
                        try {
                            range.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        range.close();
                    }
                }
                throw th2;
            }
        }
        if (range != null) {
            if (0 != 0) {
                try {
                    range.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                range.close();
            }
        }
        KeyValueIterator all = readOnlyKeyValueStore.all();
        Throwable th5 = null;
        while (all.hasNext()) {
            try {
                try {
                    treeSet2.add(all.next());
                } finally {
                }
            } catch (Throwable th6) {
                if (all != null) {
                    if (th5 != null) {
                        try {
                            all.close();
                        } catch (Throwable th7) {
                            th5.addSuppressed(th7);
                        }
                    } else {
                        all.close();
                    }
                }
                throw th6;
            }
        }
        if (all != null) {
            if (0 != 0) {
                try {
                    all.close();
                } catch (Throwable th8) {
                    th5.addSuppressed(th8);
                }
            } else {
                all.close();
            }
        }
        MatcherAssert.assertThat(treeSet, IsEqual.equalTo(treeSet3));
        MatcherAssert.assertThat(treeSet2, IsEqual.equalTo(set));
    }

    private void verifyCanGetByKey(String[] strArr, Set<KeyValue<String, Long>> set, Set<KeyValue<String, Long>> set2, ReadOnlyWindowStore<String, Long> readOnlyWindowStore, ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore) throws Exception {
        TreeSet treeSet = new TreeSet(this.stringLongComparator);
        TreeSet treeSet2 = new TreeSet(this.stringLongComparator);
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (true) {
            if ((treeSet.size() < strArr.length || treeSet2.size() < strArr.length) && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(10L);
                for (String str : strArr) {
                    treeSet.addAll(fetch(readOnlyWindowStore, str));
                    Long l = (Long) readOnlyKeyValueStore.get(str);
                    if (l != null) {
                        treeSet2.add(new KeyValue(str, l));
                    }
                }
            }
        }
        MatcherAssert.assertThat(treeSet, IsEqual.equalTo(set));
        MatcherAssert.assertThat(treeSet2, IsEqual.equalTo(set2));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void verifyGreaterOrEqual(String[] strArr, Map<String, Long> map, Map<String, Long> map2, ReadOnlyWindowStore<String, Long> readOnlyWindowStore, ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (String str : strArr) {
            hashMap.putAll(fetchMap(readOnlyWindowStore, str));
            Long l = (Long) readOnlyKeyValueStore.get(str);
            if (l != null) {
                hashMap2.put(str, l);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            if (map.containsKey(entry.getKey())) {
                Assert.assertTrue(((Long) entry.getValue()).longValue() >= ((Long) map.get(entry.getKey())).longValue());
            }
            map.put(entry.getKey(), entry.getValue());
        }
        for (Map.Entry entry2 : hashMap2.entrySet()) {
            if (map2.containsKey(entry2.getKey())) {
                Assert.assertTrue(((Long) entry2.getValue()).longValue() >= ((Long) map2.get(entry2.getKey())).longValue());
            }
            map2.put(entry2.getKey(), entry2.getValue());
        }
    }

    private void waitUntilAtLeastNumRecordProcessed(String str, int i) throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "queryable-state-consumer");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", LongDeserializer.class.getName());
        IntegrationTestUtils.waitUntilMinValuesRecordsReceived(properties, str, i, DEFAULT_TIMEOUT_MS);
    }

    private Set<KeyValue<String, Long>> fetch(ReadOnlyWindowStore<String, Long> readOnlyWindowStore, String str) {
        WindowStoreIterator fetch = readOnlyWindowStore.fetch(str, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(System.currentTimeMillis()));
        return fetch.hasNext() ? Collections.singleton(KeyValue.pair(str, ((KeyValue) fetch.next()).value)) : Collections.emptySet();
    }

    private Map<String, Long> fetchMap(ReadOnlyWindowStore<String, Long> readOnlyWindowStore, String str) {
        WindowStoreIterator fetch = readOnlyWindowStore.fetch(str, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(System.currentTimeMillis()));
        return fetch.hasNext() ? Collections.singletonMap(str, ((KeyValue) fetch.next()).value) : Collections.emptyMap();
    }
}
