/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public final class StreamTwoInputSelectableProcessor<IN1, IN2>
implements StreamInputProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class);
    private static final CompletableFuture<?> UNAVAILABLE = new CompletableFuture();
    private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
    private final InputSelectable inputSelector;
    private final Object lock;
    private final StreamTaskInput input1;
    private final StreamTaskInput input2;
    private final OperatorChain<?, ?> operatorChain;
    private final StatusWatermarkValve statusWatermarkValve1;
    private final StatusWatermarkValve statusWatermarkValve2;
    private StreamStatus firstStatus;
    private StreamStatus secondStatus;
    private int availableInputsMask;
    private int lastReadInputIndex;
    private InputSelection inputSelection;
    private Counter numRecordsIn;
    private boolean isPrepared;

    public StreamTwoInputSelectableProcessor(Collection<InputGate> inputGates1, Collection<InputGate> inputGates2, TypeSerializer<IN1> inputSerializer1, TypeSerializer<IN2> inputSerializer2, StreamTask<?, ?> streamTask, CheckpointingMode checkpointingMode, Object lock, IOManager ioManager, Configuration taskManagerConfig, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> streamOperator, WatermarkGauge input1WatermarkGauge, WatermarkGauge input2WatermarkGauge, String taskName, OperatorChain<?, ?> operatorChain) throws IOException {
        Preconditions.checkState((boolean)(streamOperator instanceof InputSelectable));
        this.streamOperator = (TwoInputStreamOperator)Preconditions.checkNotNull(streamOperator);
        this.inputSelector = (InputSelectable)((Object)streamOperator);
        this.lock = Preconditions.checkNotNull((Object)lock);
        InputGate unionedInputGate1 = InputGateUtil.createInputGate(inputGates1.toArray(new InputGate[0]));
        InputGate unionedInputGate2 = InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0]));
        CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair(streamTask, checkpointingMode, ioManager, unionedInputGate1, unionedInputGate2, taskManagerConfig, taskName);
        Preconditions.checkState((checkpointedInputGates.length == 2 ? 1 : 0) != 0);
        this.input1 = new StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager, 0);
        this.input2 = new StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager, 1);
        this.statusWatermarkValve1 = new StatusWatermarkValve(unionedInputGate1.getNumberOfInputChannels(), new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input1WatermarkGauge, 0));
        this.statusWatermarkValve2 = new StatusWatermarkValve(unionedInputGate2.getNumberOfInputChannels(), new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input2WatermarkGauge, 1));
        this.operatorChain = (OperatorChain)Preconditions.checkNotNull(operatorChain);
        this.firstStatus = StreamStatus.ACTIVE;
        this.secondStatus = StreamStatus.ACTIVE;
        this.availableInputsMask = (int)new InputSelection.Builder().select(1).select(2).build().getInputMask();
        this.lastReadInputIndex = 1;
        this.isPrepared = false;
    }

    @Override
    public boolean processInput() throws Exception {
        StreamElement recordOrMark;
        int readingInputIndex;
        if (!this.isPrepared) {
            this.prepareForProcessing();
        }
        if ((readingInputIndex = this.selectNextReadingInputIndex()) == -1) {
            return false;
        }
        this.lastReadInputIndex = readingInputIndex;
        if (readingInputIndex == 0) {
            recordOrMark = (StreamElement)this.input1.pollNextNullable();
            if (recordOrMark != null) {
                this.processElement1(recordOrMark, this.input1.getLastChannel());
            }
        } else {
            recordOrMark = (StreamElement)this.input2.pollNextNullable();
            if (recordOrMark != null) {
                this.processElement2(recordOrMark, this.input2.getLastChannel());
            }
        }
        if (recordOrMark == null) {
            this.setUnavailableInput(readingInputIndex);
        }
        return !this.checkFinished();
    }

    @Override
    public void close() throws IOException {
        IOException ex = null;
        try {
            this.input1.close();
        }
        catch (IOException e) {
            ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, ex);
        }
        try {
            this.input2.close();
        }
        catch (IOException e) {
            ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)ex);
        }
        if (ex != null) {
            throw ex;
        }
    }

    private int selectNextReadingInputIndex() throws InterruptedException, ExecutionException, IOException {
        int readingInputIndex;
        while ((readingInputIndex = this.inputSelection.fairSelectNextIndexOutOf2(this.availableInputsMask, this.lastReadInputIndex)) == -1) {
            if (this.waitForAvailableInput(this.inputSelection)) continue;
            return -1;
        }
        if (this.availableInputsMask < 3 && this.inputSelection.isALLMaskOf2()) {
            this.checkAndSetAvailable(1 - readingInputIndex);
        }
        return readingInputIndex;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processElement1(StreamElement recordOrMark, int channel) throws Exception {
        if (recordOrMark.isRecord()) {
            StreamRecord record = recordOrMark.asRecord();
            Object object = this.lock;
            synchronized (object) {
                this.numRecordsIn.inc();
                this.streamOperator.setKeyContextElement1(record);
                this.streamOperator.processElement1(record);
                this.inputSelection = this.inputSelector.nextSelection();
            }
        } else if (recordOrMark.isWatermark()) {
            this.statusWatermarkValve1.inputWatermark(recordOrMark.asWatermark(), channel);
        } else if (recordOrMark.isStreamStatus()) {
            this.statusWatermarkValve1.inputStreamStatus(recordOrMark.asStreamStatus(), channel);
        } else if (recordOrMark.isLatencyMarker()) {
            Object object = this.lock;
            synchronized (object) {
                this.streamOperator.processLatencyMarker1(recordOrMark.asLatencyMarker());
            }
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement on input1");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processElement2(StreamElement recordOrMark, int channel) throws Exception {
        if (recordOrMark.isRecord()) {
            StreamRecord record = recordOrMark.asRecord();
            Object object = this.lock;
            synchronized (object) {
                this.numRecordsIn.inc();
                this.streamOperator.setKeyContextElement2(record);
                this.streamOperator.processElement2(record);
                this.inputSelection = this.inputSelector.nextSelection();
            }
        } else if (recordOrMark.isWatermark()) {
            this.statusWatermarkValve2.inputWatermark(recordOrMark.asWatermark(), channel);
        } else if (recordOrMark.isStreamStatus()) {
            this.statusWatermarkValve2.inputStreamStatus(recordOrMark.asStreamStatus(), channel);
        } else if (recordOrMark.isLatencyMarker()) {
            Object object = this.lock;
            synchronized (object) {
                this.streamOperator.processLatencyMarker2(recordOrMark.asLatencyMarker());
            }
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement on input2");
        }
    }

    private void prepareForProcessing() {
        this.inputSelection = this.inputSelector.nextSelection();
        try {
            this.numRecordsIn = ((OperatorMetricGroup)this.streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        }
        catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
            this.numRecordsIn = new SimpleCounter();
        }
        this.isPrepared = true;
    }

    private void checkAndSetAvailable(int inputIndex) {
        StreamTaskInput input = this.getInput(inputIndex);
        if (!input.isFinished() && input.isAvailable().isDone()) {
            this.setAvailableInput(inputIndex);
        }
    }

    private boolean waitForAvailableInput(InputSelection inputSelection) throws ExecutionException, InterruptedException, IOException {
        if (inputSelection.isALLMaskOf2()) {
            return this.waitForAvailableEitherInput();
        }
        this.waitForOneInput(inputSelection.getInputMask() == InputSelection.FIRST.getInputMask() ? this.input1 : this.input2);
        return true;
    }

    private boolean waitForAvailableEitherInput() throws ExecutionException, InterruptedException {
        CompletableFuture future2;
        CompletableFuture future1 = this.input1.isFinished() ? UNAVAILABLE : this.input1.isAvailable();
        CompletableFuture completableFuture = future2 = this.input2.isFinished() ? UNAVAILABLE : this.input2.isAvailable();
        if (future1 == UNAVAILABLE && future2 == UNAVAILABLE) {
            return false;
        }
        CompletableFuture.anyOf(future1, future2).get();
        if (future1.isDone()) {
            this.setAvailableInput(this.input1.getInputIndex());
        }
        if (future2.isDone()) {
            this.setAvailableInput(this.input2.getInputIndex());
        }
        return true;
    }

    private void waitForOneInput(StreamTaskInput input) throws IOException, ExecutionException, InterruptedException {
        if (input.isFinished()) {
            throw new IOException("Could not read the finished input: input" + (input.getInputIndex() + 1) + ".");
        }
        input.isAvailable().get();
        this.setAvailableInput(input.getInputIndex());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkFinished() throws Exception {
        if (this.getInput(this.lastReadInputIndex).isFinished()) {
            Object object = this.lock;
            synchronized (object) {
                this.operatorChain.endInput(this.getInputId(this.lastReadInputIndex));
                this.inputSelection = this.inputSelector.nextSelection();
            }
        }
        return this.input1.isFinished() && this.input2.isFinished();
    }

    private void setAvailableInput(int inputIndex) {
        this.availableInputsMask |= 1 << inputIndex;
    }

    private void setUnavailableInput(int inputIndex) {
        this.availableInputsMask &= ~(1 << inputIndex);
    }

    private StreamTaskInput getInput(int inputIndex) {
        return inputIndex == 0 ? this.input1 : this.input2;
    }

    private int getInputId(int inputIndex) {
        return inputIndex + 1;
    }

    private class ForwardingValveOutputHandler
    implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;
        private final StreamStatusMaintainer streamStatusMaintainer;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;

        private ForwardingValveOutputHandler(TwoInputStreamOperator<IN1, IN2, ?> operator, Object lock, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge inputWatermarkGauge, int inputIndex) {
            this.operator = (TwoInputStreamOperator)Preconditions.checkNotNull(operator);
            this.lock = Preconditions.checkNotNull((Object)lock);
            this.streamStatusMaintainer = (StreamStatusMaintainer)Preconditions.checkNotNull((Object)streamStatusMaintainer);
            this.inputWatermarkGauge = inputWatermarkGauge;
            this.inputIndex = inputIndex;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleWatermark(Watermark watermark) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    if (this.inputIndex == 0) {
                        this.operator.processWatermark1(watermark);
                    } else {
                        this.operator.processWatermark2(watermark);
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark of input" + (this.inputIndex + 1) + ": ", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamStatus anotherStreamStatus;
                    if (this.inputIndex == 0) {
                        StreamTwoInputSelectableProcessor.this.firstStatus = streamStatus;
                        anotherStreamStatus = StreamTwoInputSelectableProcessor.this.secondStatus;
                    } else {
                        StreamTwoInputSelectableProcessor.this.secondStatus = streamStatus;
                        anotherStreamStatus = StreamTwoInputSelectableProcessor.this.firstStatus;
                    }
                    if (!streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (anotherStreamStatus.isIdle()) {
                            this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status of input" + (this.inputIndex + 1) + ": ", e);
            }
        }
    }
}

