/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KStreamImplTest {
    private final Consumed<String, String> stringConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier();
    private KStream<String, String> testStream;
    private StreamsBuilder builder;
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());

    @Before
    public void before() {
        this.builder = new StreamsBuilder();
        this.testStream = this.builder.stream("source");
    }

    @Test
    public void testNumProcesses() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), this.stringConsumed);
        KStream source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), this.stringConsumed);
        KStream stream1 = source1.filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return true;
            }
        }).filterNot((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return false;
            }
        });
        KStream stream2 = stream1.mapValues((ValueMapper)new ValueMapper<String, Integer>(){

            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        KStream stream3 = source2.flatMapValues((ValueMapper)new ValueMapper<String, Iterable<Integer>>(){

            public Iterable<Integer> apply(String value) {
                return Collections.singletonList(new Integer(value));
            }
        });
        KStream[] streams2 = stream2.branch(new Predicate[]{new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return true;
            }
        }});
        KStream[] streams3 = stream3.branch(new Predicate[]{new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return true;
            }
        }});
        boolean anyWindowSize = true;
        Joined joined = Joined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        KStream stream4 = streams2[0].join(streams3[0], (ValueJoiner)new ValueJoiner<Integer, Integer, Integer>(){

            public Integer apply(Integer value1, Integer value2) {
                return value1 + value2;
            }
        }, JoinWindows.of((long)1L), joined);
        streams2[1].join(streams3[1], (ValueJoiner)new ValueJoiner<Integer, Integer, Integer>(){

            public Integer apply(Integer value1, Integer value2) {
                return value1 + value2;
            }
        }, JoinWindows.of((long)1L), joined);
        stream4.to("topic-5");
        streams2[1].through("topic-6").process(new MockProcessorSupplier(), new String[0]);
        Assert.assertEquals((long)26L, (long)TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size());
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), this.stringConsumed);
        KStream stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), this.stringConsumed);
        stream1.to("topic-5");
        stream2.through("topic-6");
        ProcessorTopology processorTopology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null);
        Assert.assertThat((Object)processorTopology.source("topic-6").getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        Assert.assertEquals((Object)processorTopology.source("topic-4").getTimestampExtractor(), null);
        Assert.assertEquals((Object)processorTopology.source("topic-3").getTimestampExtractor(), null);
        Assert.assertEquals((Object)processorTopology.source("topic-2").getTimestampExtractor(), null);
        Assert.assertEquals((Object)processorTopology.source("topic-1").getTimestampExtractor(), null);
    }

    @Test
    public void shouldSendDataThroughTopicUsingProduced() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "topic";
        KStream stream = builder.stream("topic", this.stringConsumed);
        stream.through("through-topic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String())).process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic", (Object)"a", (Object)"b"));
        }
        Assert.assertThat(this.processorSupplier.theCapturedProcessor().processed, (Matcher)CoreMatchers.equalTo(Collections.singletonList("a:b")));
    }

    @Test
    public void shouldSendDataToTopicUsingProduced() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "topic";
        KStream stream = builder.stream("topic", this.stringConsumed);
        stream.to("to-topic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        builder.stream("to-topic", this.stringConsumed).process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic", (Object)"e", (Object)"f"));
        }
        Assert.assertThat(this.processorSupplier.theCapturedProcessor().processed, (Matcher)CoreMatchers.equalTo(Collections.singletonList("e:f")));
    }

    @Test
    public void shouldSendDataToDynamicTopics() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "topic";
        KStream stream = builder.stream("topic", this.stringConsumed);
        stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.substring(0, 1), Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        builder.stream("topic-a-v", this.stringConsumed).process(this.processorSupplier, new String[0]);
        builder.stream("topic-b-v", this.stringConsumed).process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic", (Object)"a", (Object)"v1"));
            driver.pipeInput(this.recordFactory.create("topic", (Object)"a", (Object)"v2"));
            driver.pipeInput(this.recordFactory.create("topic", (Object)"b", (Object)"v1"));
        }
        List<MockProcessor<String, String>> mockProcessors = this.processorSupplier.capturedProcessors(2);
        Assert.assertThat(mockProcessors.get((int)0).processed, (Matcher)CoreMatchers.equalTo((Object)Utils.mkList((Object[])new String[]{"a:v1", "a:v2"})));
        Assert.assertThat(mockProcessors.get((int)1).processed, (Matcher)CoreMatchers.equalTo(Collections.singletonList("b:v1")));
    }

    @Test
    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream kStream = builder.stream("topic-1", this.stringConsumed);
        ValueJoiner valueJoiner = MockValueJoiner.instance(":");
        long windowSize = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
        KStream stream = kStream.map((KeyValueMapper)new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>(){

            public KeyValue<? extends String, ? extends String> apply(String key, String value) {
                return KeyValue.pair((Object)value, (Object)value);
            }
        });
        stream.join(kStream, valueJoiner, JoinWindows.of((long)windowSize).until(3L * windowSize), Joined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.String())).to("output-topic", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build();
        SourceNode originalSourceNode = topology.source("topic-1");
        for (SourceNode sourceNode : topology.sources()) {
            if (sourceNode.name().equals(originalSourceNode.name())) {
                Assert.assertEquals((Object)sourceNode.getTimestampExtractor(), null);
                continue;
            }
            Assert.assertThat((Object)sourceNode.getTimestampExtractor(), (Matcher)IsInstanceOf.instanceOf(FailOnInvalidTimestamp.class));
        }
    }

    @Test
    public void testToWithNullValueSerdeDoesntNPE() {
        StreamsBuilder builder = new StreamsBuilder();
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        KStream inputStream = builder.stream(Collections.singleton("input"), consumed);
        inputStream.to("output", Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilter() {
        this.testStream.filter(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilterNot() {
        this.testStream.filterNot(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnSelectKey() {
        this.testStream.selectKey(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMap() {
        this.testStream.map(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValues() {
        this.testStream.mapValues((ValueMapper)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValuesWithKey() {
        this.testStream.mapValues((ValueMapperWithKey)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMap() {
        this.testStream.flatMap(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMapValues() {
        this.testStream.flatMapValues((ValueMapper)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() {
        this.testStream.flatMapValues((ValueMapperWithKey)null);
    }

    @Test(expected=IllegalArgumentException.class)
    public void shouldHaveAtLeastOnPredicateWhenBranching() {
        this.testStream.branch(new Predicate[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldCantHaveNullPredicate() {
        this.testStream.branch(new Predicate[]{null});
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicOnThrough() {
        this.testStream.through(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicOnTo() {
        this.testStream.to((String)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTopicChooserOnTo() {
        this.testStream.to((TopicNameExtractor)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTransformSupplierOnTransform() {
        this.testStream.transform(null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTransformSupplierOnTransformValues() {
        this.testStream.transformValues((ValueTransformerSupplier)null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTransformSupplierOnTransformValuesWithKey() {
        this.testStream.transformValues((ValueTransformerWithKeySupplier)null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullProcessSupplier() {
        this.testStream.process(null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherStreamOnJoin() {
        this.testStream.join(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)10L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullValueJoinerOnJoin() {
        this.testStream.join(this.testStream, null, JoinWindows.of((long)10L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinWindowsOnJoin() {
        this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTableOnTableJoin() {
        this.testStream.leftJoin((KTable)null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullValueMapperOnTableJoin() {
        this.testStream.leftJoin(this.builder.table("topic", this.stringConsumed), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSelectorOnGroupBy() {
        this.testStream.groupBy(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullActionOnForEach() {
        this.testStream.foreach(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTableOnJoinWithGlobalTable() {
        this.testStream.join((GlobalKTable)null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
        this.testStream.join(this.builder.globalTable("global", this.stringConsumed), null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
        this.testStream.join(this.builder.globalTable("global", this.stringConsumed), MockMapper.selectValueMapper(), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() {
        this.testStream.leftJoin((GlobalKTable)null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
        this.testStream.leftJoin(this.builder.globalTable("global", this.stringConsumed), null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
        this.testStream.leftJoin(this.builder.globalTable("global", this.stringConsumed), MockMapper.selectValueMapper(), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnPrintIfPrintedIsNull() {
        this.testStream.print(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnThroughWhenProducedIsNull() {
        this.testStream.through("topic", null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnToWhenProducedIsNull() {
        this.testStream.to("topic", null);
    }

    @Test
    public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
        KTable table = this.builder.table("blah", this.stringConsumed);
        try {
            this.testStream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
            Assert.fail((String)"Should have thrown NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
        KTable table = this.builder.table("blah", this.stringConsumed);
        try {
            this.testStream.join(table, MockValueJoiner.TOSTRING_JOINER, null);
            Assert.fail((String)"Should have thrown NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnJoinWithStreamWhenJoinedIsNull() {
        this.testStream.join(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)10L), null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() {
        this.testStream.outerJoin(this.testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((long)10L), null);
    }

    @Test
    public void shouldMergeTwoStreams() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        KStream merged = source1.merge(source2);
        merged.process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic-1", (Object)"A", (Object)"aa"));
            driver.pipeInput(this.recordFactory.create("topic-2", (Object)"B", (Object)"bb"));
            driver.pipeInput(this.recordFactory.create("topic-2", (Object)"C", (Object)"cc"));
            driver.pipeInput(this.recordFactory.create("topic-1", (Object)"D", (Object)"dd"));
        }
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa", "B:bb", "C:cc", "D:dd"}), this.processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldMergeMultipleStreams() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        String topic3 = "topic-3";
        String topic4 = "topic-4";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        KStream source3 = this.builder.stream("topic-3");
        KStream source4 = this.builder.stream("topic-4");
        KStream merged = source1.merge(source2).merge(source3).merge(source4);
        merged.process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic-1", (Object)"A", (Object)"aa"));
            driver.pipeInput(this.recordFactory.create("topic-2", (Object)"B", (Object)"bb"));
            driver.pipeInput(this.recordFactory.create("topic-3", (Object)"C", (Object)"cc"));
            driver.pipeInput(this.recordFactory.create("topic-4", (Object)"D", (Object)"dd"));
            driver.pipeInput(this.recordFactory.create("topic-4", (Object)"E", (Object)"ee"));
            driver.pipeInput(this.recordFactory.create("topic-3", (Object)"F", (Object)"ff"));
            driver.pipeInput(this.recordFactory.create("topic-2", (Object)"G", (Object)"gg"));
            driver.pipeInput(this.recordFactory.create("topic-1", (Object)"H", (Object)"hh"));
        }
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"}), this.processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldProcessFromSourceThatMatchPattern() {
        KStream pattern2Source = this.builder.stream(Pattern.compile("topic-\\d"));
        pattern2Source.process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic-3", (Object)"A", (Object)"aa"));
            driver.pipeInput(this.recordFactory.create("topic-4", (Object)"B", (Object)"bb"));
            driver.pipeInput(this.recordFactory.create("topic-5", (Object)"C", (Object)"cc"));
            driver.pipeInput(this.recordFactory.create("topic-6", (Object)"D", (Object)"dd"));
            driver.pipeInput(this.recordFactory.create("topic-7", (Object)"E", (Object)"ee"));
        }
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa", "B:bb", "C:cc", "D:dd", "E:ee"}), this.processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldProcessFromSourcesThatMatchMultiplePattern() {
        String topic3 = "topic-without-pattern";
        KStream pattern2Source1 = this.builder.stream(Pattern.compile("topic-\\d"));
        KStream pattern2Source2 = this.builder.stream(Pattern.compile("topic-[A-Z]"));
        KStream source3 = this.builder.stream("topic-without-pattern");
        KStream merged = pattern2Source1.merge(pattern2Source2).merge(source3);
        merged.process(this.processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic-3", (Object)"A", (Object)"aa"));
            driver.pipeInput(this.recordFactory.create("topic-4", (Object)"B", (Object)"bb"));
            driver.pipeInput(this.recordFactory.create("topic-A", (Object)"C", (Object)"cc"));
            driver.pipeInput(this.recordFactory.create("topic-Z", (Object)"D", (Object)"dd"));
            driver.pipeInput(this.recordFactory.create("topic-without-pattern", (Object)"E", (Object)"ee"));
        }
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa", "B:bb", "C:cc", "D:dd", "E:ee"}), this.processorSupplier.theCapturedProcessor().processed);
    }
}

