/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
import org.apache.flink.util.Preconditions;

public class PartitionTrackerImpl
implements PartitionTracker {
    private final JobID jobId;
    private final PartitionTable<ResourceID> partitionTable = new PartitionTable();
    private final Map<ResultPartitionID, PartitionInfo> partitionInfos = new HashMap<ResultPartitionID, PartitionInfo>();
    private final ShuffleMaster<?> shuffleMaster;
    private final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup;

    public PartitionTrackerImpl(JobID jobId, ShuffleMaster<?> shuffleMaster, PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) {
        this.jobId = (JobID)Preconditions.checkNotNull((Object)jobId);
        this.shuffleMaster = (ShuffleMaster)Preconditions.checkNotNull(shuffleMaster);
        this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
    }

    @Override
    public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        Preconditions.checkNotNull((Object)producingTaskExecutorId);
        Preconditions.checkNotNull((Object)resultPartitionDeploymentDescriptor);
        if (!resultPartitionDeploymentDescriptor.getPartitionType().isBlocking()) {
            return;
        }
        ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
        this.partitionInfos.put(resultPartitionId, new PartitionInfo(producingTaskExecutorId, resultPartitionDeploymentDescriptor));
        this.partitionTable.startTrackingPartitions(producingTaskExecutorId, Collections.singletonList(resultPartitionId));
    }

    @Override
    public void stopTrackingPartitionsFor(ResourceID producingTaskExecutorId) {
        Preconditions.checkNotNull((Object)producingTaskExecutorId);
        Collection<ResultPartitionID> resultPartitionIds = this.partitionTable.stopTrackingPartitions(producingTaskExecutorId);
        for (ResultPartitionID resultPartitionId : resultPartitionIds) {
            this.internalStopTrackingPartition(resultPartitionId);
        }
    }

    @Override
    public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds) {
        Preconditions.checkNotNull(resultPartitionIds);
        Map partitionsToReleaseByResourceId = resultPartitionIds.stream().map(this::internalStopTrackingPartition).filter(Optional::isPresent).map(Optional::get).collect(Collectors.groupingBy(partitionMetaData -> partitionMetaData.producingTaskExecutorResourceId, Collectors.mapping(partitionMetaData -> partitionMetaData.resultPartitionDeploymentDescriptor, Collectors.toList())));
        partitionsToReleaseByResourceId.forEach(this::internalReleasePartitions);
    }

    @Override
    public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
        Preconditions.checkNotNull(resultPartitionIds);
        resultPartitionIds.forEach(this::internalStopTrackingPartition);
    }

    @Override
    public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
        Preconditions.checkNotNull((Object)producingTaskExecutorId);
        Collection<ResultPartitionID> resultPartitionIds = this.partitionTable.stopTrackingPartitions(producingTaskExecutorId);
        this.stopTrackingAndReleasePartitions(resultPartitionIds);
    }

    @Override
    public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) {
        Preconditions.checkNotNull((Object)producingTaskExecutorId);
        return this.partitionTable.hasTrackedPartitions(producingTaskExecutorId);
    }

    @Override
    public boolean isPartitionTracked(ResultPartitionID resultPartitionID) {
        Preconditions.checkNotNull((Object)resultPartitionID);
        return this.partitionInfos.containsKey(resultPartitionID);
    }

    private Optional<PartitionInfo> internalStopTrackingPartition(ResultPartitionID resultPartitionId) {
        PartitionInfo partitionInfo = this.partitionInfos.remove(resultPartitionId);
        if (partitionInfo == null) {
            return Optional.empty();
        }
        this.partitionTable.stopTrackingPartitions(partitionInfo.producingTaskExecutorResourceId, Collections.singletonList(resultPartitionId));
        return Optional.of(partitionInfo);
    }

    private void internalReleasePartitions(ResourceID potentialPartitionLocation, Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) {
        this.internalReleasePartitionsOnTaskExecutor(potentialPartitionLocation, partitionDeploymentDescriptors);
        this.internalReleasePartitionsOnShuffleMaster(partitionDeploymentDescriptors);
    }

    private void internalReleasePartitionsOnTaskExecutor(ResourceID potentialPartitionLocation, Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) {
        List partitionsRequiringRpcReleaseCalls = partitionDeploymentDescriptors.stream().map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor).filter(descriptor -> descriptor.storesLocalResourcesOn().isPresent()).map(ShuffleDescriptor::getResultPartitionID).collect(Collectors.toList());
        if (!partitionsRequiringRpcReleaseCalls.isEmpty()) {
            this.taskExecutorGatewayLookup.lookup(potentialPartitionLocation).ifPresent(taskExecutorGateway -> taskExecutorGateway.releasePartitions(this.jobId, partitionsRequiringRpcReleaseCalls));
        }
    }

    private void internalReleasePartitionsOnShuffleMaster(Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) {
        partitionDeploymentDescriptors.stream().map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor).forEach(this.shuffleMaster::releasePartitionExternally);
    }

    private static final class PartitionInfo {
        public final ResourceID producingTaskExecutorResourceId;
        public final ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor;

        private PartitionInfo(ResourceID producingTaskExecutorResourceId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
            this.producingTaskExecutorResourceId = producingTaskExecutorResourceId;
            this.resultPartitionDeploymentDescriptor = resultPartitionDeploymentDescriptor;
        }
    }
}

