/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Stream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.io.AlignedController;
import org.apache.flink.streaming.runtime.io.AlternatingController;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierBehaviourController;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.SingleCheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.io.UnalignedController;
import org.apache.flink.streaming.runtime.io.UpstreamRecoveryTracker;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;

@Internal
public class InputProcessorUtil {
    public static CheckpointedInputGate createCheckpointedInputGate(AbstractInvokable toNotifyOnCheckpoint, StreamConfig config, SubtaskCheckpointCoordinator checkpointCoordinator, IndexedInputGate[] inputGates, TaskIOMetricGroup taskIOMetricGroup, String taskName, MailboxExecutor mailboxExecutor) {
        CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedMultipleInputGate(toNotifyOnCheckpoint, config, checkpointCoordinator, taskIOMetricGroup, taskName, mailboxExecutor, new List[]{Arrays.asList(inputGates)}, Collections.emptyList());
        return (CheckpointedInputGate)Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates));
    }

    public static CheckpointedInputGate[] createCheckpointedMultipleInputGate(AbstractInvokable toNotifyOnCheckpoint, StreamConfig config, SubtaskCheckpointCoordinator checkpointCoordinator, TaskIOMetricGroup taskIOMetricGroup, String taskName, MailboxExecutor mailboxExecutor, List<IndexedInputGate>[] inputGates, List<StreamTaskSourceInput<?>> sourceInputs) {
        CheckpointBarrierHandler barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(toNotifyOnCheckpoint, config, checkpointCoordinator, taskName, inputGates, sourceInputs);
        return InputProcessorUtil.createCheckpointedMultipleInputGate(mailboxExecutor, inputGates, taskIOMetricGroup, barrierHandler, config);
    }

    public static CheckpointedInputGate[] createCheckpointedMultipleInputGate(MailboxExecutor mailboxExecutor, List<IndexedInputGate>[] inputGates, TaskIOMetricGroup taskIOMetricGroup, CheckpointBarrierHandler barrierHandler, StreamConfig config) {
        InputProcessorUtil.registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
        InputGate[] unionedInputGates = (InputGate[])Arrays.stream(inputGates).map(InputGateUtil::createInputGate).toArray(InputGate[]::new);
        return (CheckpointedInputGate[])Arrays.stream(unionedInputGates).map(unionedInputGate -> new CheckpointedInputGate((InputGate)unionedInputGate, barrierHandler, mailboxExecutor, config.isGraphContainingLoops() ? UpstreamRecoveryTracker.NO_OP : UpstreamRecoveryTracker.forInputGate(unionedInputGate))).toArray(CheckpointedInputGate[]::new);
    }

    public static CheckpointBarrierHandler createCheckpointBarrierHandler(AbstractInvokable toNotifyOnCheckpoint, StreamConfig config, SubtaskCheckpointCoordinator checkpointCoordinator, String taskName, List<IndexedInputGate>[] inputGates, List<StreamTaskSourceInput<?>> sourceInputs) {
        CheckpointableInput[] inputs = (CheckpointableInput[])Stream.concat(Arrays.stream(inputGates).flatMap(Collection::stream), sourceInputs.stream()).sorted(Comparator.comparing(CheckpointableInput::getInputGateIndex)).toArray(CheckpointableInput[]::new);
        switch (config.getCheckpointMode()) {
            case EXACTLY_ONCE: {
                int numberOfChannels = (int)Arrays.stream(inputs).flatMap(gate -> gate.getChannelInfos().stream()).count();
                CheckpointBarrierBehaviourController controller = config.isUnalignedCheckpointsEnabled() ? new AlternatingController(new AlignedController(inputs), new UnalignedController(checkpointCoordinator, inputs)) : new AlignedController(inputs);
                return new SingleCheckpointBarrierHandler(taskName, toNotifyOnCheckpoint, numberOfChannels, controller);
            }
            case AT_LEAST_ONCE: {
                if (config.isUnalignedCheckpointsEnabled()) {
                    throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE checkpointing mode");
                }
                int numInputChannels = Arrays.stream(inputs).mapToInt(CheckpointableInput::getNumberOfInputChannels).sum();
                return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
            }
        }
        throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + (Object)((Object)config.getCheckpointMode()));
    }

    private static void registerCheckpointMetrics(TaskIOMetricGroup taskIOMetricGroup, CheckpointBarrierHandler barrierHandler) {
        taskIOMetricGroup.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
        taskIOMetricGroup.gauge("checkpointStartDelayNanos", barrierHandler::getCheckpointStartDelayNanos);
    }
}

