/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class KStreamKStreamJoinTest {
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Consumed<Integer, String> consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory((Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());

    @Test
    public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("left", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("right", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new IntegerSerializer());
        left.join(right, (ValueJoiner)new ValueJoiner<Integer, Integer, Integer>(){

            public Integer apply(Integer value1, Integer value2) {
                return value1 + value2;
            }
        }, JoinWindows.of((long)100L), Joined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(recordFactory.create("left", (Object)"A", null));
            LogCaptureAppender.unregister(appender);
            Assert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
        }
    }

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)100L), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("X" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult(new String[0]);
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i], (Object)("Y" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("X" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
            for (int i2 = 0; i2 < 2; ++i2) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i2], (Object)("YYY" + expectedKeys[i2])));
            }
            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
        }
    }

    @Test
    public void testOuterJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)100L), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props, 0L);){
            int i;
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("X" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i], (Object)("Y" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("X" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
            for (int i2 = 0; i2 < 2; ++i2) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i2], (Object)("YYY" + expectedKeys[i2])));
            }
            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
        }
    }

    @Test
    public void testWindowing() {
        long time = 0L;
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)100L), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props, time);){
            int i;
            for (int i2 = 0; i2 < 2; ++i2) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i2], (Object)("X" + expectedKeys[i2]), time));
            }
            MockProcessor processor = supplier.theCapturedProcessor();
            processor.checkAndClearProcessResult(new String[0]);
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i], (Object)("Y" + expectedKeys[i]), time));
            }
            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
            time = 1000L;
            for (i = 0; i < expectedKeys.length; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), time + (long)i));
            }
            processor.checkAndClearProcessResult(new String[0]);
            time += 100L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(new String[0]);
            time = 899L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(new String[0]);
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            time = 2000L;
            for (int i3 = 0; i3 < expectedKeys.length; ++i3) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i3], (Object)("Y" + expectedKeys[i3]), time + (long)i3));
            }
            processor.checkAndClearProcessResult(new String[0]);
            time = 2100L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("3:XX3+Y3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(new String[0]);
            time = 1899L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(new String[0]);
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:XX0+Y0");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("XX" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
        }
    }

    @Test
    public void testAsymmetricWindowingAfter() {
        long time = 1000L;
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)0L).after(100L), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props, time);){
            MockProcessor processor = supplier.theCapturedProcessor();
            for (int i = 0; i < expectedKeys.length; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), time + (long)i));
            }
            processor.checkAndClearProcessResult(new String[0]);
            time = 999L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(new String[0]);
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            time = 1100L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(new String[0]);
        }
    }

    @Test
    public void testAsymmetricWindowingBefore() {
        long time = 1000L;
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)0L).before(100L), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props, time);){
            MockProcessor processor = supplier.theCapturedProcessor();
            for (int i = 0; i < expectedKeys.length; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), time + (long)i));
            }
            processor.checkAndClearProcessResult(new String[0]);
            time = 899L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(new String[0]);
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            time = 1000L;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult("3:X3+YY3");
            ++time;
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("YY" + expectedKey), time));
            }
            processor.checkAndClearProcessResult(new String[0]);
        }
    }
}

