/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotAndLocality;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulerImpl
implements Scheduler {
    private static final Logger log = LoggerFactory.getLogger(SchedulerImpl.class);
    private static final int DEFAULT_SLOT_SHARING_MANAGERS_MAP_SIZE = 128;
    @Nonnull
    private final SlotSelectionStrategy slotSelectionStrategy;
    @Nonnull
    private final SlotPool slotPool;
    @Nonnull
    private ComponentMainThreadExecutor componentMainThreadExecutor;
    @Nonnull
    private final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;

    public SchedulerImpl(@Nonnull SlotSelectionStrategy slotSelectionStrategy, @Nonnull SlotPool slotPool) {
        this(slotSelectionStrategy, slotPool, new HashMap<SlotSharingGroupId, SlotSharingManager>(128));
    }

    @VisibleForTesting
    public SchedulerImpl(@Nonnull SlotSelectionStrategy slotSelectionStrategy, @Nonnull SlotPool slotPool, @Nonnull Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers) {
        this.slotSelectionStrategy = slotSelectionStrategy;
        this.slotSharingManagers = slotSharingManagers;
        this.slotPool = slotPool;
        this.componentMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor("Scheduler is not initialized with proper main thread executor. Call to Scheduler.start(...) required.");
    }

    @Override
    public void start(@Nonnull ComponentMainThreadExecutor mainThreadExecutor) {
        this.componentMainThreadExecutor = mainThreadExecutor;
    }

    @Override
    public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
        log.debug("Received slot request [{}] for task: {}", (Object)slotRequestId, (Object)scheduledUnit.getTaskToExecute());
        this.componentMainThreadExecutor.assertRunningInMainThread();
        CompletableFuture<LogicalSlot> allocationResultFuture = new CompletableFuture<LogicalSlot>();
        CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ? this.allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout) : this.allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
        allocationFuture.whenComplete((slot, failure) -> {
            if (failure != null) {
                this.cancelSlotRequest(slotRequestId, scheduledUnit.getSlotSharingGroupId(), (Throwable)failure);
                allocationResultFuture.completeExceptionally((Throwable)failure);
            } else {
                allocationResultFuture.complete((LogicalSlot)slot);
            }
        });
        return allocationResultFuture;
    }

    @Override
    public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        if (slotSharingGroupId != null) {
            this.releaseSharedSlot(slotRequestId, slotSharingGroupId, cause);
        } else {
            this.slotPool.releaseSlot(slotRequestId, cause);
        }
    }

    @Override
    public void returnLogicalSlot(LogicalSlot logicalSlot) {
        SlotRequestId slotRequestId = logicalSlot.getSlotRequestId();
        SlotSharingGroupId slotSharingGroupId = logicalSlot.getSlotSharingGroupId();
        FlinkException cause = new FlinkException("Slot is being returned to the SlotPool.");
        this.cancelSlotRequest(slotRequestId, slotSharingGroupId, cause);
    }

    private CompletableFuture<LogicalSlot> allocateSingleSlot(SlotRequestId slotRequestId, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
        Optional<SlotAndLocality> slotAndLocality = this.tryAllocateFromAvailable(slotRequestId, slotProfile);
        if (slotAndLocality.isPresent()) {
            try {
                return CompletableFuture.completedFuture(this.completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get()));
            }
            catch (FlinkException e) {
                return FutureUtils.completedExceptionally(e);
            }
        }
        if (allowQueuedScheduling) {
            return this.slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getResourceProfile(), allocationTimeout).thenApply(allocatedSlot -> {
                try {
                    return this.completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality((PhysicalSlot)allocatedSlot, Locality.UNKNOWN));
                }
                catch (FlinkException e) {
                    throw new CompletionException(e);
                }
            });
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new NoResourceAvailableException("Could not allocate a simple slot for " + (Object)((Object)slotRequestId) + '.')));
    }

    @Nonnull
    private LogicalSlot completeAllocationByAssigningPayload(@Nonnull SlotRequestId slotRequestId, @Nonnull SlotAndLocality slotAndLocality) throws FlinkException {
        SingleLogicalSlot singleTaskSlot;
        PhysicalSlot allocatedSlot = slotAndLocality.getSlot();
        if (allocatedSlot.tryAssignPayload(singleTaskSlot = new SingleLogicalSlot(slotRequestId, allocatedSlot, null, slotAndLocality.getLocality(), this))) {
            return singleTaskSlot;
        }
        FlinkException flinkException = new FlinkException("Could not assign payload to allocated slot " + (Object)((Object)allocatedSlot.getAllocationId()) + '.');
        this.slotPool.releaseSlot(slotRequestId, flinkException);
        throw flinkException;
    }

    private Optional<SlotAndLocality> tryAllocateFromAvailable(@Nonnull SlotRequestId slotRequestId, @Nonnull SlotProfile slotProfile) {
        Collection<SlotInfo> slotInfoList = this.slotPool.getAvailableSlotsInformation();
        Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot = this.slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile);
        return selectedAvailableSlot.flatMap(slotInfoAndLocality -> {
            Optional<PhysicalSlot> optionalAllocatedSlot = this.slotPool.allocateAvailableSlot(slotRequestId, slotInfoAndLocality.getSlotInfo().getAllocationId());
            return optionalAllocatedSlot.map(allocatedSlot -> new SlotAndLocality((PhysicalSlot)allocatedSlot, slotInfoAndLocality.getLocality()));
        });
    }

    private CompletableFuture<LogicalSlot> allocateSharedSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
        SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
        SlotSharingManager multiTaskSlotManager = this.slotSharingManagers.computeIfAbsent(scheduledUnit.getSlotSharingGroupId(), id -> new SlotSharingManager((SlotSharingGroupId)((Object)id), this.slotPool, this));
        try {
            multiTaskSlotLocality = scheduledUnit.getCoLocationConstraint() != null ? this.allocateCoLocatedMultiTaskSlot(scheduledUnit.getCoLocationConstraint(), multiTaskSlotManager, slotProfile, allowQueuedScheduling, allocationTimeout) : this.allocateMultiTaskSlot(scheduledUnit.getJobVertexId(), multiTaskSlotManager, slotProfile, allowQueuedScheduling, allocationTimeout);
        }
        catch (NoResourceAvailableException noResourceException) {
            return FutureUtils.completedExceptionally((Throwable)((Object)noResourceException));
        }
        Preconditions.checkState((!multiTaskSlotLocality.getMultiTaskSlot().contains(scheduledUnit.getJobVertexId()) ? 1 : 0) != 0);
        SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(slotRequestId, scheduledUnit.getJobVertexId(), multiTaskSlotLocality.getLocality());
        return leaf.getLogicalSlotFuture();
    }

    private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(CoLocationConstraint coLocationConstraint, SlotSharingManager multiTaskSlotManager, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) throws NoResourceAvailableException {
        SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();
        if (coLocationSlotRequestId != null) {
            SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
            if (taskSlot != null) {
                Preconditions.checkState((boolean)(taskSlot instanceof SlotSharingManager.MultiTaskSlot));
                return SlotSharingManager.MultiTaskSlotLocality.of((SlotSharingManager.MultiTaskSlot)taskSlot, Locality.LOCAL);
            }
            coLocationConstraint.setSlotRequestId(null);
        }
        if (coLocationConstraint.isAssigned()) {
            slotProfile = new SlotProfile(slotProfile.getResourceProfile(), Collections.singleton(coLocationConstraint.getLocation()), slotProfile.getPreferredAllocations());
        }
        SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = this.allocateMultiTaskSlot(coLocationConstraint.getGroupId(), multiTaskSlotManager, slotProfile, allowQueuedScheduling, allocationTimeout);
        if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
            multiTaskSlotLocality.getMultiTaskSlot().release(new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));
            throw new NoResourceAvailableException("Could not allocate a local multi task slot for the co location constraint " + coLocationConstraint + '.');
        }
        SlotRequestId slotRequestId = new SlotRequestId();
        SlotSharingManager.MultiTaskSlot coLocationSlot = multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(slotRequestId, coLocationConstraint.getGroupId());
        coLocationConstraint.setSlotRequestId(slotRequestId);
        coLocationSlot.getSlotContextFuture().whenComplete((slotContext, throwable) -> {
            if (throwable == null) {
                if (Objects.equals((Object)coLocationConstraint.getSlotRequestId(), (Object)slotRequestId)) {
                    coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
                } else {
                    log.debug("Failed to lock colocation constraint {} because assigned slot request {} differs from fulfilled slot request {}.", new Object[]{coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), slotRequestId});
                }
            } else {
                log.debug("Failed to lock colocation constraint {} because the slot allocation for slot request {} failed.", new Object[]{coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), throwable});
            }
        });
        return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality());
    }

    private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(AbstractID groupId, SlotSharingManager slotSharingManager, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) throws NoResourceAvailableException {
        SlotAndLocality poolSlotAndLocality;
        SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
        Collection<SlotInfo> resolvedRootSlotsInfo = slotSharingManager.listResolvedRootSlotInfo(groupId);
        SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality = this.slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);
        SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality2 = multiTaskSlotLocality = bestResolvedRootSlotWithLocality != null ? new SlotSharingManager.MultiTaskSlotLocality(slotSharingManager.getResolvedRootSlot(bestResolvedRootSlotWithLocality.getSlotInfo()), bestResolvedRootSlotWithLocality.getLocality()) : null;
        if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
            return multiTaskSlotLocality;
        }
        SlotRequestId allocatedSlotRequestId = new SlotRequestId();
        SlotRequestId multiTaskSlotRequestId = new SlotRequestId();
        Optional<SlotAndLocality> optionalPoolSlotAndLocality = this.tryAllocateFromAvailable(allocatedSlotRequestId, slotProfile);
        if (optionalPoolSlotAndLocality.isPresent() && ((poolSlotAndLocality = optionalPoolSlotAndLocality.get()).getLocality() == Locality.LOCAL || bestResolvedRootSlotWithLocality == null)) {
            SlotSharingManager.MultiTaskSlot multiTaskSlot;
            PhysicalSlot allocatedSlot2 = poolSlotAndLocality.getSlot();
            if (allocatedSlot2.tryAssignPayload(multiTaskSlot = slotSharingManager.createRootSlot(multiTaskSlotRequestId, CompletableFuture.completedFuture(poolSlotAndLocality.getSlot()), allocatedSlotRequestId))) {
                return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, poolSlotAndLocality.getLocality());
            }
            multiTaskSlot.release(new FlinkException("Could not assign payload to allocated slot " + (Object)((Object)allocatedSlot2.getAllocationId()) + '.'));
        }
        if (multiTaskSlotLocality != null) {
            if (optionalPoolSlotAndLocality.isPresent()) {
                this.slotPool.releaseSlot(allocatedSlotRequestId, new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
            }
            return multiTaskSlotLocality;
        }
        if (allowQueuedScheduling) {
            SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);
            if (multiTaskSlot == null) {
                CompletableFuture<PhysicalSlot> slotAllocationFuture = this.slotPool.requestNewAllocatedSlot(allocatedSlotRequestId, slotProfile.getResourceProfile(), allocationTimeout);
                multiTaskSlot = slotSharingManager.createRootSlot(multiTaskSlotRequestId, slotAllocationFuture, allocatedSlotRequestId);
                slotAllocationFuture.whenComplete((allocatedSlot, throwable) -> {
                    SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);
                    if (taskSlot != null) {
                        if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) {
                            taskSlot.release((Throwable)throwable);
                        } else if (!allocatedSlot.tryAssignPayload((SlotSharingManager.MultiTaskSlot)taskSlot)) {
                            taskSlot.release(new FlinkException("Could not assign payload to allocated slot " + (Object)((Object)allocatedSlot.getAllocationId()) + '.'));
                        }
                    } else {
                        this.slotPool.releaseSlot(allocatedSlotRequestId, new FlinkException("Could not find task slot with " + (Object)((Object)multiTaskSlotRequestId) + '.'));
                    }
                });
            }
            return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
        }
        throw new NoResourceAvailableException("Could not allocate a shared slot for " + groupId + '.');
    }

    private void releaseSharedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull SlotSharingGroupId slotSharingGroupId, Throwable cause) {
        SlotSharingManager multiTaskSlotManager = this.slotSharingManagers.get((Object)slotSharingGroupId);
        if (multiTaskSlotManager != null) {
            SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(slotRequestId);
            if (taskSlot != null) {
                taskSlot.release(cause);
            } else {
                log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", (Object)slotRequestId, (Object)slotSharingGroupId);
            }
        } else {
            log.debug("Could not find slot sharing group {}. Ignoring release slot request.", (Object)slotSharingGroupId);
        }
    }

    @Override
    public boolean requiresPreviousExecutionGraphAllocations() {
        return this.slotSelectionStrategy instanceof PreviousAllocationSlotSelectionStrategy;
    }
}

