/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleInputGate
extends InputGate {
    private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);
    private final Object requestLock = new Object();
    private final String owningTaskName;
    private final IntermediateDataSetID consumedResultId;
    private final ResultPartitionType consumedPartitionType;
    private final int consumedSubpartitionIndex;
    private final int numberOfInputChannels;
    private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
    private final ArrayDeque<InputChannel> inputChannelsWithData = new ArrayDeque();
    private final BitSet enqueuedInputChannelsWithData;
    private final BitSet channelsWithEndOfPartitionEvents;
    private final PartitionProducerStateProvider partitionProducerStateProvider;
    private BufferPool bufferPool;
    private final boolean isCreditBased;
    private boolean hasReceivedAllEndOfPartitionEvents;
    private boolean requestedPartitionsFlag;
    private final List<TaskEvent> pendingEvents = new ArrayList<TaskEvent>();
    private int numberOfUninitializedChannels;
    private Timer retriggerLocalRequestTimer;
    private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;
    private final CompletableFuture<Void> closeFuture;

    public SingleInputGate(String owningTaskName, IntermediateDataSetID consumedResultId, ResultPartitionType consumedPartitionType, int consumedSubpartitionIndex, int numberOfInputChannels, PartitionProducerStateProvider partitionProducerStateProvider, boolean isCreditBased, SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
        this.owningTaskName = (String)Preconditions.checkNotNull((Object)owningTaskName);
        this.consumedResultId = (IntermediateDataSetID)((Object)Preconditions.checkNotNull((Object)((Object)consumedResultId)));
        this.consumedPartitionType = (ResultPartitionType)((Object)Preconditions.checkNotNull((Object)((Object)consumedPartitionType)));
        this.bufferPoolFactory = (SupplierWithException)Preconditions.checkNotNull(bufferPoolFactory);
        Preconditions.checkArgument((consumedSubpartitionIndex >= 0 ? 1 : 0) != 0);
        this.consumedSubpartitionIndex = consumedSubpartitionIndex;
        Preconditions.checkArgument((numberOfInputChannels > 0 ? 1 : 0) != 0);
        this.numberOfInputChannels = numberOfInputChannels;
        this.inputChannels = new HashMap<IntermediateResultPartitionID, InputChannel>(numberOfInputChannels);
        this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
        this.enqueuedInputChannelsWithData = new BitSet(numberOfInputChannels);
        this.partitionProducerStateProvider = (PartitionProducerStateProvider)Preconditions.checkNotNull((Object)partitionProducerStateProvider);
        this.isCreditBased = isCreditBased;
        this.closeFuture = new CompletableFuture();
    }

    @Override
    public void setup() throws IOException, InterruptedException {
        Preconditions.checkState((this.bufferPool == null ? 1 : 0) != 0, (Object)"Bug in input gate setup logic: Already registered buffer pool.");
        if (this.isCreditBased) {
            this.assignExclusiveSegments();
        }
        BufferPool bufferPool = (BufferPool)this.bufferPoolFactory.get();
        this.setBufferPool(bufferPool);
        this.requestPartitions();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void requestPartitions() throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.requestedPartitionsFlag) {
                if (this.closeFuture.isDone()) {
                    throw new IllegalStateException("Already released.");
                }
                if (this.numberOfInputChannels != this.inputChannels.size()) {
                    throw new IllegalStateException(String.format("Bug in input gate setup logic: mismatch between number of total input channels [%s] and the currently set number of input channels [%s].", this.inputChannels.size(), this.numberOfInputChannels));
                }
                for (InputChannel inputChannel : this.inputChannels.values()) {
                    inputChannel.requestSubpartition(this.consumedSubpartitionIndex);
                }
            }
            this.requestedPartitionsFlag = true;
        }
    }

    @Override
    public int getNumberOfInputChannels() {
        return this.numberOfInputChannels;
    }

    public IntermediateDataSetID getConsumedResultId() {
        return this.consumedResultId;
    }

    public ResultPartitionType getConsumedPartitionType() {
        return this.consumedPartitionType;
    }

    BufferProvider getBufferProvider() {
        return this.bufferPool;
    }

    public BufferPool getBufferPool() {
        return this.bufferPool;
    }

    public int getNumberOfQueuedBuffers() {
        for (int retry2 = 0; retry2 < 3; ++retry2) {
            try {
                int totalBuffers = 0;
                for (InputChannel channel : this.inputChannels.values()) {
                    totalBuffers += channel.unsynchronizedGetNumberOfQueuedBuffers();
                }
                return totalBuffers;
            }
            catch (Exception exception) {
                continue;
            }
        }
        return 0;
    }

    public CompletableFuture<Void> getCloseFuture() {
        return this.closeFuture;
    }

    public void setBufferPool(BufferPool bufferPool) {
        Preconditions.checkState((this.bufferPool == null ? 1 : 0) != 0, (Object)"Bug in input gate setup logic: buffer pool hasalready been set for this input gate.");
        this.bufferPool = (BufferPool)Preconditions.checkNotNull((Object)bufferPool);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void assignExclusiveSegments() throws IOException {
        Preconditions.checkState((boolean)this.isCreditBased, (Object)"Bug in input gate setup logic: exclusive buffers only exist with credit-based flow control.");
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                if (!(inputChannel instanceof RemoteInputChannel)) continue;
                ((RemoteInputChannel)inputChannel).assignExclusiveSegments();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.inputChannels.put((IntermediateResultPartitionID)((Object)Preconditions.checkNotNull((Object)((Object)partitionId))), (InputChannel)Preconditions.checkNotNull((Object)inputChannel)) == null && inputChannel instanceof UnknownInputChannel) {
                ++this.numberOfUninitializedChannels;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateInputChannel(ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.closeFuture.isDone()) {
                return;
            }
            IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId();
            InputChannel current = this.inputChannels.get((Object)partitionId);
            if (current instanceof UnknownInputChannel) {
                InputChannel newChannel;
                UnknownInputChannel unknownChannel = (UnknownInputChannel)current;
                boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
                if (isLocal) {
                    newChannel = unknownChannel.toLocalInputChannel();
                } else {
                    RemoteInputChannel remoteInputChannel = unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId());
                    if (this.isCreditBased) {
                        remoteInputChannel.assignExclusiveSegments();
                    }
                    newChannel = remoteInputChannel;
                }
                LOG.debug("{}: Updated unknown input channel to {}.", (Object)this.owningTaskName, (Object)newChannel);
                this.inputChannels.put(partitionId, newChannel);
                if (this.requestedPartitionsFlag) {
                    newChannel.requestSubpartition(this.consumedSubpartitionIndex);
                }
                for (TaskEvent event : this.pendingEvents) {
                    newChannel.sendTaskEvent(event);
                }
                if (--this.numberOfUninitializedChannels == 0) {
                    this.pendingEvents.clear();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.closeFuture.isDone()) {
                InputChannel ch = this.inputChannels.get((Object)partitionId);
                Preconditions.checkNotNull((Object)ch, (String)("Unknown input channel with ID " + (Object)((Object)partitionId)));
                LOG.debug("{}: Retriggering partition request {}:{}.", new Object[]{this.owningTaskName, ch.partitionId, this.consumedSubpartitionIndex});
                if (ch.getClass() == RemoteInputChannel.class) {
                    RemoteInputChannel rch = (RemoteInputChannel)ch;
                    rch.retriggerSubpartitionRequest(this.consumedSubpartitionIndex);
                } else if (ch.getClass() == LocalInputChannel.class) {
                    LocalInputChannel ich = (LocalInputChannel)ch;
                    if (this.retriggerLocalRequestTimer == null) {
                        this.retriggerLocalRequestTimer = new Timer(true);
                    }
                    ich.retriggerSubpartitionRequest(this.retriggerLocalRequestTimer, this.consumedSubpartitionIndex);
                } else {
                    throw new IllegalStateException("Unexpected type of channel to retrigger partition: " + ch.getClass());
                }
            }
        }
    }

    @VisibleForTesting
    Timer getRetriggerLocalRequestTimer() {
        return this.retriggerLocalRequestTimer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        boolean released = false;
        ArrayDeque<InputChannel> arrayDeque = this.requestLock;
        synchronized (arrayDeque) {
            if (!this.closeFuture.isDone()) {
                try {
                    LOG.debug("{}: Releasing {}.", (Object)this.owningTaskName, (Object)this);
                    if (this.retriggerLocalRequestTimer != null) {
                        this.retriggerLocalRequestTimer.cancel();
                    }
                    for (InputChannel inputChannel : this.inputChannels.values()) {
                        try {
                            inputChannel.releaseAllResources();
                        }
                        catch (IOException e) {
                            LOG.warn("{}: Error during release of channel resources: {}.", new Object[]{this.owningTaskName, e.getMessage(), e});
                        }
                    }
                    if (this.bufferPool != null) {
                        this.bufferPool.lazyDestroy();
                    }
                }
                finally {
                    released = true;
                    this.closeFuture.complete(null);
                }
            }
        }
        if (released) {
            arrayDeque = this.inputChannelsWithData;
            synchronized (arrayDeque) {
                this.inputChannelsWithData.notifyAll();
            }
        }
    }

    @Override
    public boolean isFinished() {
        return this.hasReceivedAllEndOfPartitionEvents;
    }

    @Override
    public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(true);
    }

    @Override
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(false);
    }

    private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
        if (this.hasReceivedAllEndOfPartitionEvents) {
            return Optional.empty();
        }
        if (this.closeFuture.isDone()) {
            throw new IllegalStateException("Released");
        }
        Optional<InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability>> next = this.waitAndGetNextData(blocking);
        if (!next.isPresent()) {
            return Optional.empty();
        }
        InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability> inputWithData = next.get();
        return Optional.of(this.transformToBufferOrEvent(((InputChannel.BufferAndAvailability)inputWithData.data).buffer(), inputWithData.moreAvailable, (InputChannel)inputWithData.input));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability>> waitAndGetNextData(boolean blocking) throws IOException, InterruptedException {
        Optional<InputChannel> inputChannel;
        while ((inputChannel = this.getChannel(blocking)).isPresent()) {
            Optional<InputChannel.BufferAndAvailability> result = inputChannel.get().getNextBuffer();
            ArrayDeque<InputChannel> arrayDeque = this.inputChannelsWithData;
            synchronized (arrayDeque) {
                if (result.isPresent() && result.get().moreAvailable()) {
                    this.inputChannelsWithData.add(inputChannel.get());
                    this.enqueuedInputChannelsWithData.set(inputChannel.get().getChannelIndex());
                }
                if (this.inputChannelsWithData.isEmpty()) {
                    this.resetIsAvailable();
                }
                if (result.isPresent()) {
                    return Optional.of(new InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability>(inputChannel.get(), result.get(), !this.inputChannelsWithData.isEmpty()));
                }
            }
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BufferOrEvent transformToBufferOrEvent(Buffer buffer, boolean moreAvailable, InputChannel currentChannel) throws IOException, InterruptedException {
        AbstractEvent event;
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
        }
        try {
            event = EventSerializer.fromBuffer(buffer, this.getClass().getClassLoader());
        }
        finally {
            buffer.recycleBuffer();
        }
        if (event.getClass() == EndOfPartitionEvent.class) {
            this.channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
            if (this.channelsWithEndOfPartitionEvents.cardinality() == this.numberOfInputChannels) {
                Preconditions.checkState((!moreAvailable || !this.pollNext().isPresent() ? 1 : 0) != 0);
                moreAvailable = false;
                this.hasReceivedAllEndOfPartitionEvents = true;
                this.markAvailable();
            }
            currentChannel.notifySubpartitionConsumed();
            currentChannel.releaseAllResources();
        }
        return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable, buffer.getSize());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void markAvailable() {
        CompletableFuture toNotfiy;
        ArrayDeque<InputChannel> arrayDeque = this.inputChannelsWithData;
        synchronized (arrayDeque) {
            toNotfiy = this.isAvailable;
            this.isAvailable = AVAILABLE;
        }
        toNotfiy.complete(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendTaskEvent(TaskEvent event) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                inputChannel.sendTaskEvent(event);
            }
            if (this.numberOfUninitializedChannels > 0) {
                this.pendingEvents.add(event);
            }
        }
    }

    void notifyChannelNonEmpty(InputChannel channel) {
        this.queueChannel((InputChannel)Preconditions.checkNotNull((Object)channel));
    }

    void triggerPartitionStateCheck(ResultPartitionID partitionId) {
        this.partitionProducerStateProvider.requestPartitionProducerState(this.consumedResultId, partitionId, responseHandle -> {
            boolean isProducingState = new RemoteChannelStateChecker(partitionId, this.owningTaskName).isProducerReadyOrAbortConsumption((PartitionProducerStateProvider.ResponseHandle)responseHandle);
            if (isProducingState) {
                try {
                    this.retriggerPartitionRequest(partitionId.getPartitionId());
                }
                catch (IOException t) {
                    responseHandle.failConsumption(t);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void queueChannel(InputChannel channel) {
        CompletableFuture toNotify = null;
        ArrayDeque<InputChannel> arrayDeque = this.inputChannelsWithData;
        synchronized (arrayDeque) {
            if (this.enqueuedInputChannelsWithData.get(channel.getChannelIndex())) {
                return;
            }
            int availableChannels = this.inputChannelsWithData.size();
            this.inputChannelsWithData.add(channel);
            this.enqueuedInputChannelsWithData.set(channel.getChannelIndex());
            if (availableChannels == 0) {
                this.inputChannelsWithData.notifyAll();
                toNotify = this.isAvailable;
                this.isAvailable = AVAILABLE;
            }
        }
        if (toNotify != null) {
            toNotify.complete(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<InputChannel> getChannel(boolean blocking) throws InterruptedException {
        ArrayDeque<InputChannel> arrayDeque = this.inputChannelsWithData;
        synchronized (arrayDeque) {
            while (this.inputChannelsWithData.size() == 0) {
                if (this.closeFuture.isDone()) {
                    throw new IllegalStateException("Released");
                }
                if (blocking) {
                    this.inputChannelsWithData.wait();
                    continue;
                }
                this.resetIsAvailable();
                return Optional.empty();
            }
            InputChannel inputChannel = this.inputChannelsWithData.remove();
            this.enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
            return Optional.of(inputChannel);
        }
    }

    public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
        return this.inputChannels;
    }
}

