package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.class */
public class KStreamTransformIntegrationTest {
    private StreamsBuilder builder;
    private final String topic = "stream";
    private final String stateStoreName = "myTransformState";
    private final List<KeyValue<Integer, Integer>> results = new ArrayList();
    private final ForeachAction<Integer, Integer> action = new ForeachAction<Integer, Integer>() { // from class: org.apache.kafka.streams.integration.KStreamTransformIntegrationTest.1
        public void apply(Integer num, Integer num2) {
            KStreamTransformIntegrationTest.this.results.add(KeyValue.pair(num, num2));
        }
    };
    private KStream<Integer, Integer> stream;

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        this.builder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.Integer(), Serdes.Integer()));
        this.stream = this.builder.stream("stream", Consumed.with(Serdes.Integer(), Serdes.Integer()));
    }

    private void verifyResult(List<KeyValue<Integer, Integer>> list) {
        ConsumerRecordFactory consumerRecordFactory = new ConsumerRecordFactory(new IntegerSerializer(), new IntegerSerializer());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer()));
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(consumerRecordFactory.create("stream", Arrays.asList(new KeyValue(1, 1), new KeyValue(2, 2), new KeyValue(3, 3), new KeyValue(1, 4), new KeyValue(2, 5), new KeyValue(3, 6))));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                MatcherAssert.assertThat(this.results, IsEqual.equalTo(list));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldFlatTransform() throws Exception {
        this.stream.flatTransform(() -> {
            return new Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() { // from class: org.apache.kafka.streams.integration.KStreamTransformIntegrationTest.2
                private KeyValueStore state;

                public void init(ProcessorContext processorContext) {
                    this.state = processorContext.getStateStore("myTransformState");
                }

                public Iterable<KeyValue<Integer, Integer>> transform(Integer num, Integer num2) {
                    ArrayList arrayList = new ArrayList();
                    this.state.putIfAbsent(num, 0);
                    int intValue = ((Integer) this.state.get(num)).intValue();
                    for (int i = 0; i < 3; i++) {
                        int i2 = intValue;
                        intValue++;
                        arrayList.add(new KeyValue(Integer.valueOf(num.intValue() + i), Integer.valueOf(num2.intValue() + i2)));
                    }
                    this.state.put(num, new Integer(intValue));
                    return arrayList;
                }

                public void close() {
                }
            };
        }, new String[]{"myTransformState"}).foreach(this.action);
        verifyResult(Arrays.asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), KeyValue.pair(3, 3), KeyValue.pair(2, 2), KeyValue.pair(3, 3), KeyValue.pair(4, 4), KeyValue.pair(3, 3), KeyValue.pair(4, 4), KeyValue.pair(5, 5), KeyValue.pair(1, 7), KeyValue.pair(2, 8), KeyValue.pair(3, 9), KeyValue.pair(2, 8), KeyValue.pair(3, 9), KeyValue.pair(4, 10), KeyValue.pair(3, 9), KeyValue.pair(4, 10), KeyValue.pair(5, 11)));
    }

    @Test
    public void shouldTransform() throws Exception {
        this.stream.transform(() -> {
            return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() { // from class: org.apache.kafka.streams.integration.KStreamTransformIntegrationTest.3
                private KeyValueStore state;

                public void init(ProcessorContext processorContext) {
                    this.state = processorContext.getStateStore("myTransformState");
                }

                public KeyValue<Integer, Integer> transform(Integer num, Integer num2) {
                    this.state.putIfAbsent(num, 0);
                    int intValue = ((Integer) this.state.get(num)).intValue();
                    int i = intValue + 1;
                    KeyValue<Integer, Integer> keyValue = new KeyValue<>(Integer.valueOf(num.intValue() + 1), Integer.valueOf(num2.intValue() + intValue));
                    this.state.put(num, Integer.valueOf(i));
                    return keyValue;
                }

                public void close() {
                }
            };
        }, new String[]{"myTransformState"}).foreach(this.action);
        verifyResult(Arrays.asList(KeyValue.pair(2, 1), KeyValue.pair(3, 2), KeyValue.pair(4, 3), KeyValue.pair(2, 5), KeyValue.pair(3, 6), KeyValue.pair(4, 7)));
    }
}
