/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class EdgeManagerTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    EdgeManagerTest() {
    }

    @Test
    void testGetConsumedPartitionGroup() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        ExecutionGraph eg = this.buildExecutionGraph(v1, v2, 2, 2, DistributionPattern.ALL_TO_ALL);
        ConsumedPartitionGroup groupRetrievedByDownstreamVertex = (ConsumedPartitionGroup)Objects.requireNonNull(eg.getJobVertex(v2.getID())).getTaskVertices()[0].getAllConsumedPartitionGroups().get(0);
        IntermediateResultPartition consumedPartition = Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0].getPartitions()[0];
        ConsumedPartitionGroup groupRetrievedByIntermediateResultPartition = (ConsumedPartitionGroup)consumedPartition.getConsumedPartitionGroups().get(0);
        Assertions.assertThat((Iterable)groupRetrievedByIntermediateResultPartition).isEqualTo((Object)groupRetrievedByDownstreamVertex);
        ConsumedPartitionGroup groupRetrievedByScheduledResultPartition = (ConsumedPartitionGroup)eg.getSchedulingTopology().getResultPartition(consumedPartition.getPartitionId()).getConsumedPartitionGroups().get(0);
        Assertions.assertThat((Iterable)groupRetrievedByScheduledResultPartition).isEqualTo((Object)groupRetrievedByDownstreamVertex);
    }

    @Test
    public void testCalculateNumberOfConsumers() throws Exception {
        this.testCalculateNumberOfConsumers(5, 2, DistributionPattern.ALL_TO_ALL, new int[]{2, 2});
        this.testCalculateNumberOfConsumers(5, 2, DistributionPattern.POINTWISE, new int[]{1, 1});
        this.testCalculateNumberOfConsumers(2, 5, DistributionPattern.ALL_TO_ALL, new int[]{5, 5, 5, 5, 5});
        this.testCalculateNumberOfConsumers(2, 5, DistributionPattern.POINTWISE, new int[]{3, 3, 3, 2, 2});
        this.testCalculateNumberOfConsumers(5, 5, DistributionPattern.ALL_TO_ALL, new int[]{5, 5, 5, 5, 5});
        this.testCalculateNumberOfConsumers(5, 5, DistributionPattern.POINTWISE, new int[]{1, 1, 1, 1, 1});
    }

    private void testCalculateNumberOfConsumers(int producerParallelism, int consumerParallelism, DistributionPattern distributionPattern, int[] expectedConsumers) throws Exception {
        JobVertex producer = new JobVertex("producer");
        JobVertex consumer = new JobVertex("consumer");
        ExecutionGraph eg = this.buildExecutionGraph(producer, consumer, producerParallelism, consumerParallelism, distributionPattern);
        List partitionGroups = Arrays.stream(((ExecutionJobVertex)Preconditions.checkNotNull((Object)eg.getJobVertex(consumer.getID()))).getTaskVertices()).flatMap(ev -> ev.getAllConsumedPartitionGroups().stream()).collect(Collectors.toList());
        int index = 0;
        for (ConsumedPartitionGroup partitionGroup : partitionGroups) {
            Assertions.assertThat((int)partitionGroup.getNumConsumers()).isEqualTo(expectedConsumers[index++]);
        }
    }

    private ExecutionGraph buildExecutionGraph(JobVertex producer, JobVertex consumer, int producerParallelism, int consumerParallelism, DistributionPattern distributionPattern) throws Exception {
        producer.setParallelism(producerParallelism);
        consumer.setParallelism(consumerParallelism);
        producer.setInvokableClass(NoOpInvokable.class);
        consumer.setInvokableClass(NoOpInvokable.class);
        consumer.connectNewDataSetAsInput(producer, distributionPattern, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producer, consumer);
        DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        return scheduler.getExecutionGraph();
    }
}

