package org.apache.kafka.streams.integration;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/BranchedMultiLevelRepartitionConnectedTopologyTest.class */
public class BranchedMultiLevelRepartitionConnectedTopologyTest {
    private static String inputStream;
    private KafkaStreams kafkaStreams;
    private Properties streamsConfiguration;
    private final MockTime mockTime = CLUSTER.time;
    private static final Logger log = LoggerFactory.getLogger(BranchedMultiLevelRepartitionConnectedTopologyTest.class);

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

    @Before
    public void setUp() throws Exception {
        Properties properties = new Properties();
        properties.put("auto.offset.reset", "earliest");
        properties.put("topology.optimization", "none");
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("branched-repartition-topic-test", CLUSTER.bootstrapServers(), Serdes.ByteArray().getClass().getName(), Serdes.ByteArray().getClass().getName(), properties);
        inputStream = "input-stream";
        CLUSTER.createTopic(inputStream, 3, 1);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void testTopologyBuild() throws InterruptedException, ExecutionException {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream[] branch = streamsBuilder.stream(inputStream).flatMapValues(bArr -> {
            return Collections.singletonList(new byte[0]);
        }).branch(new Predicate[]{(bArr2, bArr3) -> {
            return true;
        }, (bArr4, bArr5) -> {
            return false;
        }});
        branch[0].map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).groupByKey().reduce((bArr6, bArr7) -> {
            return bArr7;
        }, Materialized.as("odd_store")).toStream().peek((bArr8, bArr9) -> {
        }).map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).groupByKey().reduce((bArr10, bArr11) -> {
            return bArr11;
        }, Materialized.as("odd_store_2")).join(branch[1].map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).groupByKey().reduce((bArr12, bArr13) -> {
            return bArr13;
        }, Materialized.as("even_store")).toStream().peek((bArr14, bArr15) -> {
        }).map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).groupByKey().reduce((bArr16, bArr17) -> {
            return bArr17;
        }, Materialized.as("even_store_2")), (bArr18, bArr19) -> {
            return bArr18;
        }, Materialized.as("joined_store")).toStream();
        Topology build = streamsBuilder.build(this.streamsConfiguration);
        log.info("Built topology: {}", build.describe());
        IntegrationTestUtils.produceKeyValuesSynchronously(inputStream, Collections.singletonList(KeyValue.pair(new byte[1], new byte[1])), TestUtils.producerConfig(CLUSTER.bootstrapServers(), ByteArraySerializer.class, ByteArraySerializer.class), this.mockTime);
        this.kafkaStreams = new KafkaStreams(build, this.streamsConfiguration);
        this.kafkaStreams.cleanUp();
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return this.kafkaStreams.state() == KafkaStreams.State.RUNNING;
        }, "Failed to observe stream transits to RUNNING");
        this.kafkaStreams.close();
    }
}
