/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class EosIntegrationTest {
    private static final int NUM_BROKERS = 3;
    private static final int MAX_POLL_INTERVAL_MS = 5000;
    private static final int MAX_WAIT_TIME_MS = 60000;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, new Properties(){
        {
            this.put("auto.create.topics.enable", (Object)false);
        }
    });
    private static String applicationId;
    private static final int NUM_TOPIC_PARTITIONS = 2;
    private static final String CONSUMER_GROUP_ID = "readCommitted";
    private static final String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic";
    private static final String SINGLE_PARTITION_THROUGH_TOPIC = "singlePartitionThroughTopic";
    private static final String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
    private static final String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
    private static final String MULTI_PARTITION_THROUGH_TOPIC = "multiPartitionThroughTopic";
    private static final String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
    private final String storeName = "store";
    private AtomicBoolean errorInjected;
    private AtomicBoolean gcInjected;
    private volatile boolean doGC = true;
    private AtomicInteger commitRequested;
    private Throwable uncaughtException;
    private int testNumber = 0;

    @Before
    public void createTopics() throws InterruptedException {
        applicationId = "appId-" + ++this.testNumber;
        CLUSTER.deleteTopicsAndWait(SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopics(SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, 2, 1);
        CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, 2, 1);
        CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, 2, 1);
    }

    @Test
    public void shouldBeAbleToRunWithEosEnabled() throws Exception {
        this.runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToRestartAfterClose() throws Exception {
        this.runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
        this.runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
        this.runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
        this.runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
        this.runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runSimpleCopyTest(int numberOfRestarts, String inputTopic, String throughTopic, String outputTopic) throws Exception {
        KStream input;
        StreamsBuilder builder = new StreamsBuilder();
        KStream output = input = builder.stream(inputTopic);
        if (throughTopic != null) {
            output = input.through(throughTopic);
        }
        output.to(outputTopic);
        for (int i = 0; i < numberOfRestarts; ++i) {
            try (KafkaStreams streams = new KafkaStreams(builder.build(), StreamsTestUtils.getStreamsConfig(applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties(){
                {
                    this.put(StreamsConfig.consumerPrefix((String)"max.poll.records"), (Object)1);
                    this.put("processing.guarantee", "exactly_once");
                }
            }));){
                streams.start();
                List<KeyValue<Long, Long>> inputData = this.prepareData(i * 100, (long)(i * 100) + 10L, 0L, 1L);
                IntegrationTestUtils.produceKeyValuesSynchronously(inputTopic, inputData, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), (Time)EosIntegrationTest.CLUSTER.time);
                List<KeyValue<Long, Long>> committedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, (Properties)new Properties(){
                    {
                        this.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }), outputTopic, inputData.size());
                this.checkResultPerKey(committedRecords, inputData);
                continue;
            }
        }
    }

    private void checkResultPerKey(List<KeyValue<Long, Long>> result, List<KeyValue<Long, Long>> expectedResult) {
        HashSet<Long> allKeys = new HashSet<Long>();
        this.addAllKeys(allKeys, result);
        this.addAllKeys(allKeys, expectedResult);
        for (Long key : allKeys) {
            MatcherAssert.assertThat(this.getAllRecordPerKey(key, result), (Matcher)CoreMatchers.equalTo(this.getAllRecordPerKey(key, expectedResult)));
        }
    }

    private void addAllKeys(Set<Long> allKeys, List<KeyValue<Long, Long>> records) {
        for (KeyValue<Long, Long> record : records) {
            allKeys.add((Long)record.key);
        }
    }

    private List<KeyValue<Long, Long>> getAllRecordPerKey(Long key, List<KeyValue<Long, Long>> records) {
        ArrayList<KeyValue<Long, Long>> recordsPerKey = new ArrayList<KeyValue<Long, Long>>(records.size());
        for (KeyValue<Long, Long> record : records) {
            if (!((Long)record.key).equals(key)) continue;
            recordsPerKey.add(record);
        }
        return recordsPerKey;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        try (KafkaStreams streams = new KafkaStreams(builder.build(), StreamsTestUtils.getStreamsConfig(applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties(){
            {
                this.put("processing.guarantee", "exactly_once");
            }
        }));){
            streams.start();
            List firstBurstOfData = this.prepareData(0L, 5L, 0L);
            List secondBurstOfData = this.prepareData(5L, 8L, 0L);
            IntegrationTestUtils.produceKeyValuesSynchronously(SINGLE_PARTITION_INPUT_TOPIC, firstBurstOfData, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), (Time)EosIntegrationTest.CLUSTER.time);
            List firstCommittedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, (Properties)new Properties(){
                {
                    this.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                }
            }), SINGLE_PARTITION_OUTPUT_TOPIC, firstBurstOfData.size());
            MatcherAssert.assertThat(firstCommittedRecords, (Matcher)CoreMatchers.equalTo(firstBurstOfData));
            IntegrationTestUtils.produceKeyValuesSynchronously(SINGLE_PARTITION_INPUT_TOPIC, secondBurstOfData, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), (Time)EosIntegrationTest.CLUSTER.time);
            List secondCommittedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, (Properties)new Properties(){
                {
                    this.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                }
            }), SINGLE_PARTITION_OUTPUT_TOPIC, secondBurstOfData.size());
            MatcherAssert.assertThat(secondCommittedRecords, (Matcher)CoreMatchers.equalTo(secondBurstOfData));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotViolateEosIfOneTaskFails() throws Exception {
        try (KafkaStreams streams = this.getKafkaStreams(false, "appDir", 2);){
            streams.start();
            List<KeyValue<Long, Long>> committedDataBeforeFailure = this.prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = this.prepareData(10L, 15L, 0L, 1L);
            ArrayList<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<KeyValue<Long, Long>>();
            dataBeforeFailure.addAll(committedDataBeforeFailure);
            dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
            List<KeyValue<Long, Long>> dataAfterFailure = this.prepareData(15L, 20L, 0L, 1L);
            this.writeInputData(committedDataBeforeFailure);
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return EosIntegrationTest.this.commitRequested.get() == 2;
                }
            }, (long)60000L, (String)"SteamsTasks did not request commit.");
            this.writeInputData(uncommittedDataBeforeFailure);
            List<KeyValue<Long, Long>> uncommittedRecords = this.readResult(dataBeforeFailure.size(), null);
            List<KeyValue<Long, Long>> committedRecords = this.readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
            this.checkResultPerKey(committedRecords, committedDataBeforeFailure);
            this.checkResultPerKey(uncommittedRecords, dataBeforeFailure);
            this.errorInjected.set(true);
            this.writeInputData(dataAfterFailure);
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return EosIntegrationTest.this.uncaughtException != null;
                }
            }, (long)60000L, (String)"Should receive uncaught exception from one StreamThread.");
            List<KeyValue<Long, Long>> allCommittedRecords = this.readResult(committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), "readCommitted_ALL");
            List<KeyValue<Long, Long>> committedRecordsAfterFailure = this.readResult(uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), CONSUMER_GROUP_ID);
            ArrayList<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<KeyValue<Long, Long>>();
            allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
            allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
            allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
            ArrayList<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<KeyValue<Long, Long>>();
            expectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
            expectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
            this.checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
            this.checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
        try (KafkaStreams streams = this.getKafkaStreams(true, "appDir", 2);){
            streams.start();
            List<KeyValue<Long, Long>> committedDataBeforeFailure = this.prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = this.prepareData(10L, 15L, 0L, 1L, 2L, 3L);
            ArrayList<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<KeyValue<Long, Long>>();
            dataBeforeFailure.addAll(committedDataBeforeFailure);
            dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
            List<KeyValue<Long, Long>> dataAfterFailure = this.prepareData(15L, 20L, 0L, 1L);
            this.writeInputData(committedDataBeforeFailure);
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return EosIntegrationTest.this.commitRequested.get() == 2;
                }
            }, (long)60000L, (String)"SteamsTasks did not request commit.");
            this.writeInputData(uncommittedDataBeforeFailure);
            List<KeyValue<Long, Long>> uncommittedRecords = this.readResult(dataBeforeFailure.size(), null);
            List<KeyValue<Long, Long>> committedRecords = this.readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
            List<KeyValue<Long, Long>> expectedResultBeforeFailure = this.computeExpectedResult(dataBeforeFailure);
            this.checkResultPerKey(committedRecords, this.computeExpectedResult(committedDataBeforeFailure));
            this.checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
            this.verifyStateStore(streams, this.getMaxPerKey(expectedResultBeforeFailure));
            this.errorInjected.set(true);
            this.writeInputData(dataAfterFailure);
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return EosIntegrationTest.this.uncaughtException != null;
                }
            }, (long)60000L, (String)"Should receive uncaught exception from one StreamThread.");
            List<KeyValue<Long, Long>> allCommittedRecords = this.readResult(committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), "readCommitted_ALL");
            List<KeyValue<Long, Long>> committedRecordsAfterFailure = this.readResult(uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), CONSUMER_GROUP_ID);
            ArrayList<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<KeyValue<Long, Long>>();
            allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
            allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
            allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
            List<KeyValue<Long, Long>> expectedResult = this.computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
            this.checkResultPerKey(allCommittedRecords, expectedResult);
            this.checkResultPerKey(committedRecordsAfterFailure, expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
            this.verifyStateStore(streams, this.getMaxPerKey(expectedResult));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
        final KafkaStreams streams1 = this.getKafkaStreams(false, "appDir1", 1);
        final KafkaStreams streams2 = this.getKafkaStreams(false, "appDir2", 1);
        try {
            streams1.start();
            streams2.start();
            List<KeyValue<Long, Long>> committedDataBeforeGC = this.prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> uncommittedDataBeforeGC = this.prepareData(10L, 15L, 0L, 1L);
            ArrayList<KeyValue<Long, Long>> dataBeforeGC = new ArrayList<KeyValue<Long, Long>>();
            dataBeforeGC.addAll(committedDataBeforeGC);
            dataBeforeGC.addAll(uncommittedDataBeforeGC);
            List<KeyValue<Long, Long>> dataToTriggerFirstRebalance = this.prepareData(15L, 20L, 0L, 1L);
            List<KeyValue<Long, Long>> dataAfterSecondRebalance = this.prepareData(20L, 30L, 0L, 1L);
            this.writeInputData(committedDataBeforeGC);
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return EosIntegrationTest.this.commitRequested.get() == 2;
                }
            }, (long)60000L, (String)"SteamsTasks did not request commit.");
            this.writeInputData(uncommittedDataBeforeGC);
            List<KeyValue<Long, Long>> uncommittedRecords = this.readResult(dataBeforeGC.size(), null);
            List<KeyValue<Long, Long>> committedRecords = this.readResult(committedDataBeforeGC.size(), CONSUMER_GROUP_ID);
            this.checkResultPerKey(committedRecords, committedDataBeforeGC);
            this.checkResultPerKey(uncommittedRecords, dataBeforeGC);
            this.gcInjected.set(true);
            this.writeInputData(dataToTriggerFirstRebalance);
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1 && (((StreamsMetadata)streams1.allMetadata().iterator().next()).topicPartitions().size() == 2 || ((StreamsMetadata)streams2.allMetadata().iterator().next()).topicPartitions().size() == 2);
                }
            }, (long)60000L, (String)"Should have rebalanced.");
            List<KeyValue<Long, Long>> committedRecordsAfterRebalance = this.readResult(uncommittedDataBeforeGC.size() + dataToTriggerFirstRebalance.size(), CONSUMER_GROUP_ID);
            ArrayList<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<KeyValue<Long, Long>>();
            expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeGC);
            expectedCommittedRecordsAfterRebalance.addAll(dataToTriggerFirstRebalance);
            this.checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance);
            this.doGC = false;
            TestUtils.waitForCondition((TestCondition)new TestCondition(){

                public boolean conditionMet() {
                    return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1 && ((StreamsMetadata)streams1.allMetadata().iterator().next()).topicPartitions().size() == 1 && ((StreamsMetadata)streams2.allMetadata().iterator().next()).topicPartitions().size() == 1;
                }
            }, (long)60000L, (String)"Should have rebalanced.");
            this.writeInputData(dataAfterSecondRebalance);
            List<KeyValue<Long, Long>> allCommittedRecords = this.readResult(committedDataBeforeGC.size() + uncommittedDataBeforeGC.size() + dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size(), "readCommitted_ALL");
            ArrayList<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<KeyValue<Long, Long>>();
            allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeGC);
            allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeGC);
            allExpectedCommittedRecordsAfterRecovery.addAll(dataToTriggerFirstRebalance);
            allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance);
            this.checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
        }
        finally {
            streams1.close();
            streams2.close();
        }
    }

    private List<KeyValue<Long, Long>> prepareData(long fromInclusive, long toExclusive, Long ... keys) {
        ArrayList<KeyValue<Long, Long>> data = new ArrayList<KeyValue<Long, Long>>();
        for (Long k : keys) {
            for (long v = fromInclusive; v < toExclusive; ++v) {
                data.add((KeyValue<Long, Long>)new KeyValue((Object)k, (Object)v));
            }
        }
        return data;
    }

    private KafkaStreams getKafkaStreams(final boolean withState, final String appDir, final int numberOfStreamsThreads) {
        this.commitRequested = new AtomicInteger(0);
        this.errorInjected = new AtomicBoolean(false);
        this.gcInjected = new AtomicBoolean(false);
        StreamsBuilder builder = new StreamsBuilder();
        String[] storeNames = null;
        if (withState) {
            storeNames = new String[]{"store"};
            StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"), (Serde)Serdes.Long(), (Serde)Serdes.Long()).withCachingEnabled();
            builder.addStateStore(storeBuilder);
        }
        KStream input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
        input.transform((TransformerSupplier)new TransformerSupplier<Long, Long, KeyValue<Long, Long>>(){

            public Transformer<Long, Long, KeyValue<Long, Long>> get() {
                return new Transformer<Long, Long, KeyValue<Long, Long>>(){
                    ProcessorContext context;
                    KeyValueStore<Long, Long> state = null;

                    public void init(ProcessorContext context) {
                        this.context = context;
                        if (withState) {
                            this.state = (KeyValueStore)context.getStateStore("store");
                        }
                    }

                    public KeyValue<Long, Long> transform(Long key, Long value) {
                        if (EosIntegrationTest.this.gcInjected.compareAndSet(true, false)) {
                            while (EosIntegrationTest.this.doGC) {
                                try {
                                    Thread.sleep(100L);
                                }
                                catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        }
                        if ((value + 1L) % 10L == 0L) {
                            this.context.commit();
                            EosIntegrationTest.this.commitRequested.incrementAndGet();
                        }
                        if (this.state != null) {
                            Long sum = (Long)this.state.get((Object)key);
                            sum = sum == null ? value : Long.valueOf(sum + value);
                            this.state.put((Object)key, (Object)sum);
                            this.state.flush();
                        }
                        if (EosIntegrationTest.this.errorInjected.compareAndSet(true, false)) {
                            throw new RuntimeException("Injected test exception.");
                        }
                        if (this.state != null) {
                            return new KeyValue((Object)key, this.state.get((Object)key));
                        }
                        return new KeyValue((Object)key, (Object)value);
                    }

                    public void close() {
                    }
                };
            }
        }, storeNames).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        KafkaStreams streams = new KafkaStreams(builder.build(), StreamsTestUtils.getStreamsConfig(applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties(){
            {
                this.put("processing.guarantee", "exactly_once");
                this.put("num.stream.threads", (Object)numberOfStreamsThreads);
                this.put("commit.interval.ms", (Object)-1);
                this.put(StreamsConfig.consumerPrefix((String)"request.timeout.ms"), (Object)5000);
                this.put(StreamsConfig.consumerPrefix((String)"session.timeout.ms"), (Object)4999);
                this.put(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms"), (Object)5000);
                this.put("cache.max.bytes.buffering", (Object)0);
                this.put("state.dir", TestUtils.tempDirectory().getPath() + File.separator + appDir);
                this.put("application.server", "dummy:2142");
            }
        }));
        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                if (EosIntegrationTest.this.uncaughtException != null) {
                    e.printStackTrace(System.err);
                    Assert.fail((String)"Should only get one uncaught exception from Streams.");
                }
                EosIntegrationTest.this.uncaughtException = e;
            }
        });
        return streams;
    }

    private void writeInputData(List<KeyValue<Long, Long>> records) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, records, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), (Time)EosIntegrationTest.CLUSTER.time);
    }

    private List<KeyValue<Long, Long>> readResult(int numberOfRecords, String groupId) throws InterruptedException {
        if (groupId != null) {
            return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)groupId, LongDeserializer.class, LongDeserializer.class, (Properties)new Properties(){
                {
                    this.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                }
            }), SINGLE_PARTITION_OUTPUT_TOPIC, numberOfRecords);
        }
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), SINGLE_PARTITION_OUTPUT_TOPIC, numberOfRecords);
    }

    private List<KeyValue<Long, Long>> computeExpectedResult(List<KeyValue<Long, Long>> input) {
        ArrayList<KeyValue<Long, Long>> expectedResult = new ArrayList<KeyValue<Long, Long>>(input.size());
        HashMap<Object, Long> sums = new HashMap<Object, Long>();
        for (KeyValue<Long, Long> record : input) {
            Long sum = (Long)sums.get(record.key);
            sum = sum == null ? (Long)record.value : Long.valueOf(sum + (Long)record.value);
            sums.put(record.key, sum);
            expectedResult.add((KeyValue<Long, Long>)new KeyValue(record.key, (Object)sum));
        }
        return expectedResult;
    }

    private Set<KeyValue<Long, Long>> getMaxPerKey(List<KeyValue<Long, Long>> input) {
        HashSet<KeyValue<Long, Long>> expectedResult = new HashSet<KeyValue<Long, Long>>(input.size());
        HashMap<Object, Object> maxPerKey = new HashMap<Object, Object>();
        for (KeyValue<Long, Long> keyValue : input) {
            Long max = (Long)maxPerKey.get(keyValue.key);
            if (max != null && (Long)keyValue.value <= max) continue;
            maxPerKey.put(keyValue.key, keyValue.value);
        }
        for (Map.Entry entry : maxPerKey.entrySet()) {
            expectedResult.add((KeyValue<Long, Long>)new KeyValue(entry.getKey(), entry.getValue()));
        }
        return expectedResult;
    }

    private void verifyStateStore(KafkaStreams streams, Set<KeyValue<Long, Long>> expectedStoreContent) {
        ReadOnlyKeyValueStore store = null;
        long maxWaitingTime = System.currentTimeMillis() + 300000L;
        while (System.currentTimeMillis() < maxWaitingTime) {
            try {
                store = (ReadOnlyKeyValueStore)streams.store("store", QueryableStoreTypes.keyValueStore());
                break;
            }
            catch (InvalidStateStoreException okJustRetry) {
                try {
                    Thread.sleep(5000L);
                }
                catch (Exception exception) {}
            }
        }
        Assert.assertNotNull(store);
        KeyValueIterator it = store.all();
        while (it.hasNext()) {
            Assert.assertTrue((boolean)expectedStoreContent.remove(it.next()));
        }
        Assert.assertTrue((boolean)expectedStoreContent.isEmpty());
    }
}

