package org.apache.kafka.streams.integration;

import java.io.File;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/EosIntegrationTest.class */
public class EosIntegrationTest {
    private static final int MAX_POLL_INTERVAL_MS = 5000;
    private static final int MAX_WAIT_TIME_MS = 60000;
    private static String applicationId;
    private static final int NUM_TOPIC_PARTITIONS = 2;
    private static final String CONSUMER_GROUP_ID = "readCommitted";
    private static final String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic";
    private static final String SINGLE_PARTITION_THROUGH_TOPIC = "singlePartitionThroughTopic";
    private static final String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
    private static final String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
    private static final String MULTI_PARTITION_THROUGH_TOPIC = "multiPartitionThroughTopic";
    private static final String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
    private AtomicBoolean errorInjected;
    private AtomicBoolean gcInjected;
    private AtomicInteger commitRequested;
    private Throwable uncaughtException;
    private static final int NUM_BROKERS = 3;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.1
        {
            put("auto.create.topics.enable", false);
        }
    });
    private final String storeName = "store";
    private volatile boolean doGC = true;
    private int testNumber = 0;

    @Before
    public void createTopics() throws InterruptedException {
        StringBuilder append = new StringBuilder().append("appId-");
        int i = this.testNumber + 1;
        this.testNumber = i;
        applicationId = append.append(i).toString();
        CLUSTER.deleteTopicsAndWait(SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopics(SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
        CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, NUM_TOPIC_PARTITIONS, 1);
        CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
    }

    @Test
    public void shouldBeAbleToRunWithEosEnabled() throws Exception {
        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToRestartAfterClose() throws Exception {
        runSimpleCopyTest(NUM_TOPIC_PARTITIONS, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
    }

    @Test
    public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
    }

    private void runSimpleCopyTest(int i, String str, String str2, String str3) throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(str);
        KStream kStream = stream;
        if (str2 != null) {
            kStream = stream.through(str2);
        }
        kStream.to(str3);
        for (int i2 = 0; i2 < i; i2++) {
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), StreamsTestUtils.getStreamsConfig(applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.2
                {
                    put(StreamsConfig.consumerPrefix("max.poll.records"), 1);
                    put("processing.guarantee", "exactly_once");
                }
            }));
            try {
                kafkaStreams.start();
                List<KeyValue<Long, Long>> prepareData = prepareData(i2 * 100, (i2 * 100) + 10, 0L, 1L);
                IntegrationTestUtils.produceKeyValuesSynchronously(str, prepareData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time);
                checkResultPerKey(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, new Properties() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.3
                    {
                        put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }), str3, prepareData.size()), prepareData);
                kafkaStreams.close();
            } catch (Throwable th) {
                kafkaStreams.close();
                throw th;
            }
        }
    }

    private void checkResultPerKey(List<KeyValue<Long, Long>> list, List<KeyValue<Long, Long>> list2) {
        HashSet hashSet = new HashSet();
        addAllKeys(hashSet, list);
        addAllKeys(hashSet, list2);
        for (Long l : hashSet) {
            MatcherAssert.assertThat(getAllRecordPerKey(l, list), CoreMatchers.equalTo(getAllRecordPerKey(l, list2)));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addAllKeys(Set<Long> set, List<KeyValue<Long, Long>> list) {
        Iterator<KeyValue<Long, Long>> it = list.iterator();
        while (it.hasNext()) {
            set.add(it.next().key);
        }
    }

    private List<KeyValue<Long, Long>> getAllRecordPerKey(Long l, List<KeyValue<Long, Long>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (KeyValue<Long, Long> keyValue : list) {
            if (((Long) keyValue.key).equals(l)) {
                arrayList.add(keyValue);
            }
        }
        return arrayList;
    }

    @Test
    public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), StreamsTestUtils.getStreamsConfig(applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.4
            {
                put("processing.guarantee", "exactly_once");
            }
        }));
        try {
            kafkaStreams.start();
            List<KeyValue<Long, Long>> prepareData = prepareData(0L, 5L, 0L);
            List<KeyValue<Long, Long>> prepareData2 = prepareData(5L, 8L, 0L);
            IntegrationTestUtils.produceKeyValuesSynchronously(SINGLE_PARTITION_INPUT_TOPIC, prepareData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, new Properties() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.5
                {
                    put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                }
            }), SINGLE_PARTITION_OUTPUT_TOPIC, prepareData.size()), CoreMatchers.equalTo(prepareData));
            IntegrationTestUtils.produceKeyValuesSynchronously(SINGLE_PARTITION_INPUT_TOPIC, prepareData2, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, new Properties() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.6
                {
                    put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                }
            }), SINGLE_PARTITION_OUTPUT_TOPIC, prepareData2.size()), CoreMatchers.equalTo(prepareData2));
            kafkaStreams.close();
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void shouldNotViolateEosIfOneTaskFails() throws Exception {
        KafkaStreams kafkaStreams = getKafkaStreams(false, "appDir", NUM_TOPIC_PARTITIONS);
        try {
            kafkaStreams.start();
            List<KeyValue<Long, Long>> prepareData = prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> prepareData2 = prepareData(10L, 15L, 0L, 1L);
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(prepareData);
            arrayList.addAll(prepareData2);
            List<KeyValue<Long, Long>> prepareData3 = prepareData(15L, 20L, 0L, 1L);
            writeInputData(prepareData);
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.7
                public boolean conditionMet() {
                    return EosIntegrationTest.this.commitRequested.get() == EosIntegrationTest.NUM_TOPIC_PARTITIONS;
                }
            }, 60000L, "SteamsTasks did not request commit.");
            writeInputData(prepareData2);
            List<KeyValue<Long, Long>> readResult = readResult(arrayList.size(), null);
            checkResultPerKey(readResult(prepareData.size(), CONSUMER_GROUP_ID), prepareData);
            checkResultPerKey(readResult, arrayList);
            this.errorInjected.set(true);
            writeInputData(prepareData3);
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.8
                public boolean conditionMet() {
                    return EosIntegrationTest.this.uncaughtException != null;
                }
            }, 60000L, "Should receive uncaught exception from one StreamThread.");
            List<KeyValue<Long, Long>> readResult2 = readResult(prepareData.size() + prepareData2.size() + prepareData3.size(), "readCommitted_ALL");
            List<KeyValue<Long, Long>> readResult3 = readResult(prepareData2.size() + prepareData3.size(), CONSUMER_GROUP_ID);
            ArrayList arrayList2 = new ArrayList();
            arrayList2.addAll(prepareData);
            arrayList2.addAll(prepareData2);
            arrayList2.addAll(prepareData3);
            ArrayList arrayList3 = new ArrayList();
            arrayList3.addAll(prepareData2);
            arrayList3.addAll(prepareData3);
            checkResultPerKey(readResult2, arrayList2);
            checkResultPerKey(readResult3, arrayList3);
            kafkaStreams.close();
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
        KafkaStreams kafkaStreams = getKafkaStreams(true, "appDir", NUM_TOPIC_PARTITIONS);
        try {
            kafkaStreams.start();
            List<KeyValue<Long, Long>> prepareData = prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> prepareData2 = prepareData(10L, 15L, 0L, 1L, 2L, 3L);
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(prepareData);
            arrayList.addAll(prepareData2);
            List<KeyValue<Long, Long>> prepareData3 = prepareData(15L, 20L, 0L, 1L);
            writeInputData(prepareData);
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.9
                public boolean conditionMet() {
                    return EosIntegrationTest.this.commitRequested.get() == EosIntegrationTest.NUM_TOPIC_PARTITIONS;
                }
            }, 60000L, "SteamsTasks did not request commit.");
            writeInputData(prepareData2);
            List<KeyValue<Long, Long>> readResult = readResult(arrayList.size(), null);
            List<KeyValue<Long, Long>> readResult2 = readResult(prepareData.size(), CONSUMER_GROUP_ID);
            List<KeyValue<Long, Long>> computeExpectedResult = computeExpectedResult(arrayList);
            checkResultPerKey(readResult2, computeExpectedResult(prepareData));
            checkResultPerKey(readResult, computeExpectedResult);
            verifyStateStore(kafkaStreams, getMaxPerKey(computeExpectedResult));
            this.errorInjected.set(true);
            writeInputData(prepareData3);
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.10
                public boolean conditionMet() {
                    return EosIntegrationTest.this.uncaughtException != null;
                }
            }, 60000L, "Should receive uncaught exception from one StreamThread.");
            List<KeyValue<Long, Long>> readResult3 = readResult(prepareData.size() + prepareData2.size() + prepareData3.size(), "readCommitted_ALL");
            List<KeyValue<Long, Long>> readResult4 = readResult(prepareData2.size() + prepareData3.size(), CONSUMER_GROUP_ID);
            ArrayList arrayList2 = new ArrayList();
            arrayList2.addAll(prepareData);
            arrayList2.addAll(prepareData2);
            arrayList2.addAll(prepareData3);
            List<KeyValue<Long, Long>> computeExpectedResult2 = computeExpectedResult(arrayList2);
            checkResultPerKey(readResult3, computeExpectedResult2);
            checkResultPerKey(readResult4, computeExpectedResult2.subList(prepareData.size(), computeExpectedResult2.size()));
            verifyStateStore(kafkaStreams, getMaxPerKey(computeExpectedResult2));
            kafkaStreams.close();
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
        final KafkaStreams kafkaStreams = getKafkaStreams(false, "appDir1", 1);
        final KafkaStreams kafkaStreams2 = getKafkaStreams(false, "appDir2", 1);
        try {
            kafkaStreams.start();
            kafkaStreams2.start();
            List<KeyValue<Long, Long>> prepareData = prepareData(0L, 10L, 0L, 1L);
            List<KeyValue<Long, Long>> prepareData2 = prepareData(10L, 15L, 0L, 1L);
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(prepareData);
            arrayList.addAll(prepareData2);
            List<KeyValue<Long, Long>> prepareData3 = prepareData(15L, 20L, 0L, 1L);
            List<KeyValue<Long, Long>> prepareData4 = prepareData(20L, 30L, 0L, 1L);
            writeInputData(prepareData);
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.11
                public boolean conditionMet() {
                    return EosIntegrationTest.this.commitRequested.get() == EosIntegrationTest.NUM_TOPIC_PARTITIONS;
                }
            }, 60000L, "SteamsTasks did not request commit.");
            writeInputData(prepareData2);
            List<KeyValue<Long, Long>> readResult = readResult(arrayList.size(), null);
            checkResultPerKey(readResult(prepareData.size(), CONSUMER_GROUP_ID), prepareData);
            checkResultPerKey(readResult, arrayList);
            this.gcInjected.set(true);
            writeInputData(prepareData3);
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.12
                public boolean conditionMet() {
                    return kafkaStreams.allMetadata().size() == 1 && kafkaStreams2.allMetadata().size() == 1 && (((StreamsMetadata) kafkaStreams.allMetadata().iterator().next()).topicPartitions().size() == EosIntegrationTest.NUM_TOPIC_PARTITIONS || ((StreamsMetadata) kafkaStreams2.allMetadata().iterator().next()).topicPartitions().size() == EosIntegrationTest.NUM_TOPIC_PARTITIONS);
                }
            }, 60000L, "Should have rebalanced.");
            List<KeyValue<Long, Long>> readResult2 = readResult(prepareData2.size() + prepareData3.size(), CONSUMER_GROUP_ID);
            ArrayList arrayList2 = new ArrayList();
            arrayList2.addAll(prepareData2);
            arrayList2.addAll(prepareData3);
            checkResultPerKey(readResult2, arrayList2);
            this.doGC = false;
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.13
                public boolean conditionMet() {
                    return kafkaStreams.allMetadata().size() == 1 && kafkaStreams2.allMetadata().size() == 1 && ((StreamsMetadata) kafkaStreams.allMetadata().iterator().next()).topicPartitions().size() == 1 && ((StreamsMetadata) kafkaStreams2.allMetadata().iterator().next()).topicPartitions().size() == 1;
                }
            }, 60000L, "Should have rebalanced.");
            writeInputData(prepareData4);
            List<KeyValue<Long, Long>> readResult3 = readResult(prepareData.size() + prepareData2.size() + prepareData3.size() + prepareData4.size(), "readCommitted_ALL");
            ArrayList arrayList3 = new ArrayList();
            arrayList3.addAll(prepareData);
            arrayList3.addAll(prepareData2);
            arrayList3.addAll(prepareData3);
            arrayList3.addAll(prepareData4);
            checkResultPerKey(readResult3, arrayList3);
            kafkaStreams.close();
            kafkaStreams2.close();
        } catch (Throwable th) {
            kafkaStreams.close();
            kafkaStreams2.close();
            throw th;
        }
    }

    private List<KeyValue<Long, Long>> prepareData(long j, long j2, Long... lArr) {
        ArrayList arrayList = new ArrayList();
        for (Long l : lArr) {
            long j3 = j;
            while (true) {
                long j4 = j3;
                if (j4 < j2) {
                    arrayList.add(new KeyValue(l, Long.valueOf(j4)));
                    j3 = j4 + 1;
                }
            }
        }
        return arrayList;
    }

    private KafkaStreams getKafkaStreams(final boolean z, final String str, final int i) {
        this.commitRequested = new AtomicInteger(0);
        this.errorInjected = new AtomicBoolean(false);
        this.gcInjected = new AtomicBoolean(false);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        String[] strArr = null;
        if (z) {
            strArr = new String[]{"store"};
            streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.Long(), Serdes.Long()).withCachingEnabled());
        }
        streamsBuilder.stream(MULTI_PARTITION_INPUT_TOPIC).transform(new TransformerSupplier<Long, Long, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.14
            public Transformer<Long, Long, KeyValue<Long, Long>> get() {
                return new Transformer<Long, Long, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.14.1
                    ProcessorContext context;
                    KeyValueStore<Long, Long> state = null;

                    public void init(ProcessorContext processorContext) {
                        this.context = processorContext;
                        if (z) {
                            this.state = processorContext.getStateStore("store");
                        }
                    }

                    /* JADX WARN: Code restructure failed: missing block: B:11:0x0031, code lost:
                    
                        throw new java.lang.RuntimeException(r9);
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:15:0x003e, code lost:
                    
                        if (((r8.longValue() + 1) % 10) != 0) goto L13;
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:16:0x0041, code lost:
                    
                        r6.context.commit();
                        r6.this$1.this$0.commitRequested.incrementAndGet();
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:18:0x005c, code lost:
                    
                        if (r6.state == null) goto L20;
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:19:0x005f, code lost:
                    
                        r0 = (java.lang.Long) r6.state.get(r7);
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:20:0x006e, code lost:
                    
                        if (r0 != null) goto L18;
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:21:0x0071, code lost:
                    
                        r9 = r8;
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:22:0x0083, code lost:
                    
                        r6.state.put(r7, r9);
                        r6.state.flush();
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:23:0x0076, code lost:
                    
                        r9 = java.lang.Long.valueOf(r0.longValue() + r8.longValue());
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:25:0x00a6, code lost:
                    
                        if (r6.this$1.this$0.errorInjected.compareAndSet(true, false) == false) goto L24;
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:27:0x00b2, code lost:
                    
                        throw new java.lang.RuntimeException("Injected test exception.");
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:29:0x00b7, code lost:
                    
                        if (r6.state == null) goto L28;
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:2:0x000f, code lost:
                    
                        if (r6.this$1.this$0.gcInjected.compareAndSet(true, false) != false) goto L4;
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:31:0x00cc, code lost:
                    
                        return new org.apache.kafka.streams.KeyValue<>(r7, r6.state.get(r7));
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:33:0x00d6, code lost:
                    
                        return new org.apache.kafka.streams.KeyValue<>(r7, r8);
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:4:0x001c, code lost:
                    
                        if (r6.this$1.this$0.doGC == false) goto L32;
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:6:0x001f, code lost:
                    
                        java.lang.Thread.sleep(100);
                     */
                    /* JADX WARN: Code restructure failed: missing block: B:9:0x0028, code lost:
                    
                        r9 = move-exception;
                     */
                    /*
                        Code decompiled incorrectly, please refer to instructions dump.
                        To view partially-correct add '--show-bad-code' argument
                    */
                    public org.apache.kafka.streams.KeyValue<java.lang.Long, java.lang.Long> transform(java.lang.Long r7, java.lang.Long r8) {
                        /*
                            r6 = this;
                            r0 = r6
                            org.apache.kafka.streams.integration.EosIntegrationTest$14 r0 = org.apache.kafka.streams.integration.EosIntegrationTest.AnonymousClass14.this
                            org.apache.kafka.streams.integration.EosIntegrationTest r0 = org.apache.kafka.streams.integration.EosIntegrationTest.this
                            java.util.concurrent.atomic.AtomicBoolean r0 = org.apache.kafka.streams.integration.EosIntegrationTest.access$200(r0)
                            r1 = 1
                            r2 = 0
                            boolean r0 = r0.compareAndSet(r1, r2)
                            if (r0 == 0) goto L32
                        L12:
                            r0 = r6
                            org.apache.kafka.streams.integration.EosIntegrationTest$14 r0 = org.apache.kafka.streams.integration.EosIntegrationTest.AnonymousClass14.this
                            org.apache.kafka.streams.integration.EosIntegrationTest r0 = org.apache.kafka.streams.integration.EosIntegrationTest.this
                            boolean r0 = org.apache.kafka.streams.integration.EosIntegrationTest.access$300(r0)
                            if (r0 == 0) goto L32
                            r0 = 100
                            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> L28
                            goto L12
                        L28:
                            r9 = move-exception
                            java.lang.RuntimeException r0 = new java.lang.RuntimeException
                            r1 = r0
                            r2 = r9
                            r1.<init>(r2)
                            throw r0
                        L32:
                            r0 = r8
                            long r0 = r0.longValue()
                            r1 = 1
                            long r0 = r0 + r1
                            r1 = 10
                            long r0 = r0 % r1
                            r1 = 0
                            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                            if (r0 != 0) goto L58
                            r0 = r6
                            org.apache.kafka.streams.processor.ProcessorContext r0 = r0.context
                            r0.commit()
                            r0 = r6
                            org.apache.kafka.streams.integration.EosIntegrationTest$14 r0 = org.apache.kafka.streams.integration.EosIntegrationTest.AnonymousClass14.this
                            org.apache.kafka.streams.integration.EosIntegrationTest r0 = org.apache.kafka.streams.integration.EosIntegrationTest.this
                            java.util.concurrent.atomic.AtomicInteger r0 = org.apache.kafka.streams.integration.EosIntegrationTest.access$000(r0)
                            int r0 = r0.incrementAndGet()
                        L58:
                            r0 = r6
                            org.apache.kafka.streams.state.KeyValueStore<java.lang.Long, java.lang.Long> r0 = r0.state
                            if (r0 == 0) goto L97
                            r0 = r6
                            org.apache.kafka.streams.state.KeyValueStore<java.lang.Long, java.lang.Long> r0 = r0.state
                            r1 = r7
                            java.lang.Object r0 = r0.get(r1)
                            java.lang.Long r0 = (java.lang.Long) r0
                            r9 = r0
                            r0 = r9
                            if (r0 != 0) goto L76
                            r0 = r8
                            r9 = r0
                            goto L83
                        L76:
                            r0 = r9
                            long r0 = r0.longValue()
                            r1 = r8
                            long r1 = r1.longValue()
                            long r0 = r0 + r1
                            java.lang.Long r0 = java.lang.Long.valueOf(r0)
                            r9 = r0
                        L83:
                            r0 = r6
                            org.apache.kafka.streams.state.KeyValueStore<java.lang.Long, java.lang.Long> r0 = r0.state
                            r1 = r7
                            r2 = r9
                            r0.put(r1, r2)
                            r0 = r6
                            org.apache.kafka.streams.state.KeyValueStore<java.lang.Long, java.lang.Long> r0 = r0.state
                            r0.flush()
                        L97:
                            r0 = r6
                            org.apache.kafka.streams.integration.EosIntegrationTest$14 r0 = org.apache.kafka.streams.integration.EosIntegrationTest.AnonymousClass14.this
                            org.apache.kafka.streams.integration.EosIntegrationTest r0 = org.apache.kafka.streams.integration.EosIntegrationTest.this
                            java.util.concurrent.atomic.AtomicBoolean r0 = org.apache.kafka.streams.integration.EosIntegrationTest.access$400(r0)
                            r1 = 1
                            r2 = 0
                            boolean r0 = r0.compareAndSet(r1, r2)
                            if (r0 == 0) goto Lb3
                            java.lang.RuntimeException r0 = new java.lang.RuntimeException
                            r1 = r0
                            java.lang.String r2 = "Injected test exception."
                            r1.<init>(r2)
                            throw r0
                        Lb3:
                            r0 = r6
                            org.apache.kafka.streams.state.KeyValueStore<java.lang.Long, java.lang.Long> r0 = r0.state
                            if (r0 == 0) goto Lcd
                            org.apache.kafka.streams.KeyValue r0 = new org.apache.kafka.streams.KeyValue
                            r1 = r0
                            r2 = r7
                            r3 = r6
                            org.apache.kafka.streams.state.KeyValueStore<java.lang.Long, java.lang.Long> r3 = r3.state
                            r4 = r7
                            java.lang.Object r3 = r3.get(r4)
                            r1.<init>(r2, r3)
                            return r0
                        Lcd:
                            org.apache.kafka.streams.KeyValue r0 = new org.apache.kafka.streams.KeyValue
                            r1 = r0
                            r2 = r7
                            r3 = r8
                            r1.<init>(r2, r3)
                            return r0
                        */
                        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.integration.EosIntegrationTest.AnonymousClass14.AnonymousClass1.transform(java.lang.Long, java.lang.Long):org.apache.kafka.streams.KeyValue");
                    }

                    public void close() {
                    }
                };
            }
        }, strArr).to(SINGLE_PARTITION_OUTPUT_TOPIC);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), StreamsTestUtils.getStreamsConfig(applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.15
            {
                put("processing.guarantee", "exactly_once");
                put("num.stream.threads", Integer.valueOf(i));
                put("commit.interval.ms", -1);
                put(StreamsConfig.consumerPrefix("request.timeout.ms"), Integer.valueOf(EosIntegrationTest.MAX_POLL_INTERVAL_MS));
                put(StreamsConfig.consumerPrefix("session.timeout.ms"), 4999);
                put(StreamsConfig.consumerPrefix("max.poll.interval.ms"), Integer.valueOf(EosIntegrationTest.MAX_POLL_INTERVAL_MS));
                put("cache.max.bytes.buffering", 0);
                put("state.dir", TestUtils.tempDirectory().getPath() + File.separator + str);
                put("application.server", "dummy:2142");
            }
        }));
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.16
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                if (EosIntegrationTest.this.uncaughtException != null) {
                    th.printStackTrace(System.err);
                    Assert.fail("Should only get one uncaught exception from Streams.");
                }
                EosIntegrationTest.this.uncaughtException = th;
            }
        });
        return kafkaStreams;
    }

    private void writeInputData(List<KeyValue<Long, Long>> list) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, list, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time);
    }

    private List<KeyValue<Long, Long>> readResult(int i, String str) throws InterruptedException {
        return str != null ? IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), str, LongDeserializer.class, LongDeserializer.class, new Properties() { // from class: org.apache.kafka.streams.integration.EosIntegrationTest.17
            {
                put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
            }
        }), SINGLE_PARTITION_OUTPUT_TOPIC, i) : IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), SINGLE_PARTITION_OUTPUT_TOPIC, i);
    }

    private List<KeyValue<Long, Long>> computeExpectedResult(List<KeyValue<Long, Long>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        HashMap hashMap = new HashMap();
        for (KeyValue<Long, Long> keyValue : list) {
            Long l = (Long) hashMap.get(keyValue.key);
            Long valueOf = l == null ? (Long) keyValue.value : Long.valueOf(l.longValue() + ((Long) keyValue.value).longValue());
            hashMap.put(keyValue.key, valueOf);
            arrayList.add(new KeyValue(keyValue.key, valueOf));
        }
        return arrayList;
    }

    private Set<KeyValue<Long, Long>> getMaxPerKey(List<KeyValue<Long, Long>> list) {
        HashSet hashSet = new HashSet(list.size());
        HashMap hashMap = new HashMap();
        for (KeyValue<Long, Long> keyValue : list) {
            Long l = (Long) hashMap.get(keyValue.key);
            if (l == null || ((Long) keyValue.value).longValue() > l.longValue()) {
                hashMap.put(keyValue.key, keyValue.value);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            hashSet.add(new KeyValue(entry.getKey(), entry.getValue()));
        }
        return hashSet;
    }

    private void verifyStateStore(KafkaStreams kafkaStreams, Set<KeyValue<Long, Long>> set) {
        ReadOnlyKeyValueStore readOnlyKeyValueStore = null;
        long currentTimeMillis = System.currentTimeMillis() + 300000;
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
                readOnlyKeyValueStore = (ReadOnlyKeyValueStore) kafkaStreams.store("store", QueryableStoreTypes.keyValueStore());
                break;
            } catch (InvalidStateStoreException e) {
                try {
                    Thread.sleep(5000L);
                } catch (Exception e2) {
                }
            }
        }
        Assert.assertNotNull(readOnlyKeyValueStore);
        KeyValueIterator all = readOnlyKeyValueStore.all();
        while (all.hasNext()) {
            Assert.assertTrue(set.remove(all.next()));
        }
        Assert.assertTrue(set.isEmpty());
    }
}
