/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotActions;
import org.apache.flink.runtime.jobmaster.slotpool.DualKeyMap;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotAndLocality;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class SlotPool
extends RpcEndpoint
implements SlotPoolGateway,
AllocatedSlotActions {
    private static final int STATUS_LOG_INTERVAL_MS = 60000;
    private final JobID jobId;
    private final SchedulingStrategy schedulingStrategy;
    private final ProviderAndOwner providerAndOwner;
    private final HashSet<ResourceID> registeredTaskManagers;
    private final AllocatedSlots allocatedSlots;
    private final AvailableSlots availableSlots;
    private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;
    private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
    private final Time rpcTimeout;
    private final Time idleSlotTimeout;
    private final Clock clock;
    protected final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;
    private JobMasterId jobMasterId;
    private ResourceManagerGateway resourceManagerGateway;
    private String jobManagerAddress;

    @VisibleForTesting
    protected SlotPool(RpcService rpcService, JobID jobId, SchedulingStrategy schedulingStrategy) {
        this(rpcService, jobId, schedulingStrategy, SystemClock.getInstance(), AkkaUtils.getDefaultTimeout(), Time.milliseconds((long)((Long)JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue())));
    }

    public SlotPool(RpcService rpcService, JobID jobId, SchedulingStrategy schedulingStrategy, Clock clock, Time rpcTimeout, Time idleSlotTimeout) {
        super(rpcService);
        this.jobId = (JobID)Preconditions.checkNotNull((Object)jobId);
        this.schedulingStrategy = (SchedulingStrategy)Preconditions.checkNotNull((Object)schedulingStrategy);
        this.clock = (Clock)Preconditions.checkNotNull((Object)clock);
        this.rpcTimeout = (Time)Preconditions.checkNotNull((Object)rpcTimeout);
        this.idleSlotTimeout = (Time)Preconditions.checkNotNull((Object)idleSlotTimeout);
        this.registeredTaskManagers = new HashSet(16);
        this.allocatedSlots = new AllocatedSlots();
        this.availableSlots = new AvailableSlots();
        this.pendingRequests = new DualKeyMap(16);
        this.waitingForResourceManager = new HashMap(16);
        this.providerAndOwner = new ProviderAndOwner(this.getSelfGateway(SlotPoolGateway.class));
        this.slotSharingManagers = new HashMap<SlotSharingGroupId, SlotSharingManager>(4);
        this.jobMasterId = null;
        this.resourceManagerGateway = null;
        this.jobManagerAddress = null;
    }

    @Override
    public void start() {
        throw new UnsupportedOperationException("Should never call start() without leader ID");
    }

    public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception {
        this.jobMasterId = (JobMasterId)((Object)Preconditions.checkNotNull((Object)((Object)jobMasterId)));
        this.jobManagerAddress = (String)Preconditions.checkNotNull((Object)newJobManagerAddress);
        try {
            super.start();
        }
        catch (Exception e) {
            throw new RuntimeException("This should never happen", e);
        }
        this.scheduleRunAsync(this::checkIdleSlot, this.idleSlotTimeout);
        if (this.log.isDebugEnabled()) {
            this.scheduleRunAsync(this::scheduledLogStatus, 60000L, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public CompletableFuture<Void> postStop() {
        this.log.info("Stopping SlotPool.");
        Set<AllocationID> allocationIds = this.pendingRequests.keySetB();
        for (AllocationID allocationId : allocationIds) {
            this.resourceManagerGateway.cancelSlotRequest(allocationId);
        }
        for (ResourceID taskManagerResourceId : this.registeredTaskManagers) {
            FlinkException cause = new FlinkException("Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");
            this.releaseTaskManagerInternal(taskManagerResourceId, (Exception)((Object)cause));
        }
        this.clear();
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void suspend() {
        this.log.info("Suspending SlotPool.");
        this.validateRunsInMainThread();
        Set<AllocationID> allocationIds = this.pendingRequests.keySetB();
        for (AllocationID allocationId : allocationIds) {
            this.resourceManagerGateway.cancelSlotRequest(allocationId);
        }
        this.stop();
        this.jobMasterId = null;
        this.resourceManagerGateway = null;
        this.clear();
    }

    public SlotOwner getSlotOwner() {
        return this.providerAndOwner;
    }

    public SlotProvider getSlotProvider() {
        return this.providerAndOwner;
    }

    @Override
    public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = (ResourceManagerGateway)Preconditions.checkNotNull((Object)resourceManagerGateway);
        for (PendingRequest pendingRequest : this.waitingForResourceManager.values()) {
            this.requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
        }
        this.waitingForResourceManager.clear();
    }

    @Override
    public void disconnectResourceManager() {
        this.resourceManagerGateway = null;
    }

    @Override
    public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
        this.log.debug("Received slot request [{}] for task: {}", (Object)slotRequestId, (Object)task.getTaskToExecute());
        SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
        if (slotSharingGroupId != null) {
            SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
            SlotSharingManager multiTaskSlotManager = this.slotSharingManagers.computeIfAbsent(slotSharingGroupId, id -> new SlotSharingManager((SlotSharingGroupId)((Object)id), this, this.providerAndOwner));
            try {
                multiTaskSlotLocality = task.getCoLocationConstraint() != null ? this.allocateCoLocatedMultiTaskSlot(task.getCoLocationConstraint(), multiTaskSlotManager, slotProfile, allowQueuedScheduling, allocationTimeout) : this.allocateMultiTaskSlot(task.getJobVertexId(), multiTaskSlotManager, slotProfile, allowQueuedScheduling, allocationTimeout);
            }
            catch (NoResourceAvailableException noResourceException) {
                return FutureUtils.completedExceptionally((Throwable)((Object)noResourceException));
            }
            Preconditions.checkState((!multiTaskSlotLocality.getMultiTaskSlot().contains(task.getJobVertexId()) ? 1 : 0) != 0);
            SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(slotRequestId, task.getJobVertexId(), multiTaskSlotLocality.getLocality());
            return leaf.getLogicalSlotFuture();
        }
        CompletableFuture<SlotAndLocality> slotAndLocalityFuture = this.requestAllocatedSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout);
        return slotAndLocalityFuture.thenApply(slotAndLocality -> {
            SingleLogicalSlot singleTaskSlot;
            AllocatedSlot allocatedSlot = slotAndLocality.getSlot();
            if (allocatedSlot.tryAssignPayload(singleTaskSlot = new SingleLogicalSlot(slotRequestId, allocatedSlot, null, slotAndLocality.getLocality(), this.providerAndOwner))) {
                return singleTaskSlot;
            }
            FlinkException flinkException = new FlinkException("Could not assign payload to allocated slot " + (Object)((Object)allocatedSlot.getAllocationId()) + '.');
            this.releaseSlot(slotRequestId, null, flinkException);
            throw new CompletionException(flinkException);
        });
    }

    private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(CoLocationConstraint coLocationConstraint, SlotSharingManager multiTaskSlotManager, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) throws NoResourceAvailableException {
        SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();
        if (coLocationSlotRequestId != null) {
            SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
            if (taskSlot != null) {
                Preconditions.checkState((boolean)(taskSlot instanceof SlotSharingManager.MultiTaskSlot));
                return SlotSharingManager.MultiTaskSlotLocality.of((SlotSharingManager.MultiTaskSlot)taskSlot, Locality.LOCAL);
            }
            coLocationConstraint.setSlotRequestId(null);
        }
        if (coLocationConstraint.isAssigned()) {
            slotProfile = new SlotProfile(slotProfile.getResourceProfile(), Collections.singleton(coLocationConstraint.getLocation()), slotProfile.getPriorAllocations());
        }
        SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = this.allocateMultiTaskSlot(coLocationConstraint.getGroupId(), multiTaskSlotManager, slotProfile, allowQueuedScheduling, allocationTimeout);
        if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
            multiTaskSlotLocality.getMultiTaskSlot().release(new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));
            throw new NoResourceAvailableException("Could not allocate a local multi task slot for the co location constraint " + coLocationConstraint + '.');
        }
        SlotRequestId slotRequestId = new SlotRequestId();
        SlotSharingManager.MultiTaskSlot coLocationSlot = multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(slotRequestId, coLocationConstraint.getGroupId());
        coLocationConstraint.setSlotRequestId(slotRequestId);
        coLocationSlot.getSlotContextFuture().whenComplete((slotContext, throwable) -> {
            if (throwable == null) {
                if (Objects.equals((Object)coLocationConstraint.getSlotRequestId(), (Object)slotRequestId)) {
                    coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
                } else {
                    this.log.debug("Failed to lock colocation constraint {} because assigned slot request {} differs from fulfilled slot request {}.", new Object[]{coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), slotRequestId});
                }
            } else {
                this.log.debug("Failed to lock colocation constraint {} because the slot allocation for slot request {} failed.", new Object[]{coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), throwable});
            }
        });
        return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality());
    }

    private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(AbstractID groupId, SlotSharingManager slotSharingManager, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) throws NoResourceAvailableException {
        SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = slotSharingManager.getResolvedRootSlot(groupId, this.schedulingStrategy, slotProfile);
        if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
            return multiTaskSlotLocality;
        }
        SlotRequestId allocatedSlotRequestId = new SlotRequestId();
        SlotRequestId multiTaskSlotRequestId = new SlotRequestId();
        SlotAndLocality polledSlotAndLocality = this.pollAndAllocateSlot(allocatedSlotRequestId, slotProfile);
        if (polledSlotAndLocality != null && (polledSlotAndLocality.getLocality() == Locality.LOCAL || multiTaskSlotLocality == null)) {
            SlotSharingManager.MultiTaskSlot multiTaskSlot;
            AllocatedSlot allocatedSlot2 = polledSlotAndLocality.getSlot();
            if (allocatedSlot2.tryAssignPayload(multiTaskSlot = slotSharingManager.createRootSlot(multiTaskSlotRequestId, CompletableFuture.completedFuture(polledSlotAndLocality.getSlot()), allocatedSlotRequestId))) {
                return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, polledSlotAndLocality.getLocality());
            }
            multiTaskSlot.release(new FlinkException("Could not assign payload to allocated slot " + (Object)((Object)allocatedSlot2.getAllocationId()) + '.'));
        }
        if (multiTaskSlotLocality != null) {
            if (polledSlotAndLocality != null) {
                this.releaseSlot(allocatedSlotRequestId, null, new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
            }
            return multiTaskSlotLocality;
        }
        if (allowQueuedScheduling) {
            SlotSharingManager.MultiTaskSlot multiTaskSlotFuture = slotSharingManager.getUnresolvedRootSlot(groupId);
            if (multiTaskSlotFuture == null) {
                CompletableFuture<AllocatedSlot> futureSlot = this.requestNewAllocatedSlot(allocatedSlotRequestId, slotProfile.getResourceProfile(), allocationTimeout);
                multiTaskSlotFuture = slotSharingManager.createRootSlot(multiTaskSlotRequestId, futureSlot, allocatedSlotRequestId);
                futureSlot.whenComplete((allocatedSlot, throwable) -> {
                    SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);
                    if (taskSlot != null) {
                        if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) {
                            taskSlot.release((Throwable)throwable);
                        } else if (!allocatedSlot.tryAssignPayload((SlotSharingManager.MultiTaskSlot)taskSlot)) {
                            taskSlot.release(new FlinkException("Could not assign payload to allocated slot " + (Object)((Object)allocatedSlot.getAllocationId()) + '.'));
                        }
                    } else {
                        this.releaseSlot(allocatedSlotRequestId, null, new FlinkException("Could not find task slot with " + (Object)((Object)multiTaskSlotRequestId) + '.'));
                    }
                });
            }
            return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlotFuture, Locality.UNKNOWN);
        }
        throw new NoResourceAvailableException("Could not allocate a shared slot for " + groupId + '.');
    }

    private CompletableFuture<SlotAndLocality> requestAllocatedSlot(SlotRequestId slotRequestId, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
        CompletionStage<SlotAndLocality> allocatedSlotLocalityFuture;
        SlotAndLocality slotFromPool = this.pollAndAllocateSlot(slotRequestId, slotProfile);
        if (slotFromPool != null) {
            allocatedSlotLocalityFuture = CompletableFuture.completedFuture(slotFromPool);
        } else if (allowQueuedScheduling) {
            CompletableFuture<AllocatedSlot> allocatedSlotFuture = this.requestNewAllocatedSlot(slotRequestId, slotProfile.getResourceProfile(), allocationTimeout);
            allocatedSlotLocalityFuture = allocatedSlotFuture.thenApply(allocatedSlot -> new SlotAndLocality((AllocatedSlot)allocatedSlot, Locality.UNKNOWN));
        } else {
            allocatedSlotLocalityFuture = FutureUtils.completedExceptionally((Throwable)((Object)new NoResourceAvailableException("Could not allocate a simple slot for " + (Object)((Object)slotRequestId) + '.')));
        }
        return allocatedSlotLocalityFuture;
    }

    private CompletableFuture<AllocatedSlot> requestNewAllocatedSlot(SlotRequestId slotRequestId, ResourceProfile resourceProfile, Time allocationTimeout) {
        PendingRequest pendingRequest = new PendingRequest(slotRequestId, resourceProfile);
        FutureUtils.orTimeout(pendingRequest.getAllocatedSlotFuture(), allocationTimeout.toMilliseconds(), TimeUnit.MILLISECONDS).whenCompleteAsync((ignored, throwable) -> {
            if (throwable instanceof TimeoutException) {
                this.timeoutPendingSlotRequest(slotRequestId);
            }
        }, (Executor)this.getMainThreadExecutor());
        if (this.resourceManagerGateway == null) {
            this.stashRequestWaitingForResourceManager(pendingRequest);
        } else {
            this.requestSlotFromResourceManager(this.resourceManagerGateway, pendingRequest);
        }
        return pendingRequest.getAllocatedSlotFuture();
    }

    private void requestSlotFromResourceManager(ResourceManagerGateway resourceManagerGateway, PendingRequest pendingRequest) {
        Preconditions.checkNotNull((Object)resourceManagerGateway);
        Preconditions.checkNotNull((Object)pendingRequest);
        this.log.info("Requesting new slot [{}] and profile {} from resource manager.", (Object)pendingRequest.getSlotRequestId(), (Object)pendingRequest.getResourceProfile());
        AllocationID allocationId = new AllocationID();
        this.pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
        pendingRequest.getAllocatedSlotFuture().whenComplete((allocatedSlot, throwable) -> {
            if (throwable != null || !allocationId.equals((Object)allocatedSlot.getAllocationId())) {
                resourceManagerGateway.cancelSlotRequest(allocationId);
            }
        });
        CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(this.jobMasterId, new SlotRequest(this.jobId, allocationId, pendingRequest.getResourceProfile(), this.jobManagerAddress), this.rpcTimeout);
        rmResponse.whenCompleteAsync((ignored, failure) -> {
            if (failure != null) {
                this.slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), (Throwable)failure);
            }
        }, (Executor)this.getMainThreadExecutor());
    }

    private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) {
        PendingRequest request = this.pendingRequests.removeKeyA(slotRequestID);
        if (request != null) {
            request.getAllocatedSlotFuture().completeExceptionally((Throwable)((Object)new NoResourceAvailableException("No pooled slot available and request to ResourceManager for new slot failed", failure)));
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Unregistered slot request [{}] failed.", (Object)slotRequestID, (Object)failure);
        }
    }

    private void stashRequestWaitingForResourceManager(PendingRequest pendingRequest) {
        this.log.info("Cannot serve slot request, no ResourceManager connected. Adding as pending request [{}]", (Object)pendingRequest.getSlotRequestId());
        this.waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
    }

    @Override
    public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
        this.log.debug("Releasing slot [{}] because: {}", (Object)slotRequestId, (Object)(cause != null ? cause.getMessage() : "null"));
        if (slotSharingGroupId != null) {
            SlotSharingManager multiTaskSlotManager = this.slotSharingManagers.get((Object)slotSharingGroupId);
            if (multiTaskSlotManager != null) {
                SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(slotRequestId);
                if (taskSlot != null) {
                    taskSlot.release(cause);
                } else {
                    this.log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", (Object)slotRequestId, (Object)slotSharingGroupId);
                }
            } else {
                this.log.debug("Could not find slot sharing group {}. Ignoring release slot request.", (Object)slotSharingGroupId);
            }
        } else {
            PendingRequest pendingRequest = this.removePendingRequest(slotRequestId);
            if (pendingRequest != null) {
                this.failPendingRequest(pendingRequest, (Exception)((Object)new FlinkException("Pending slot request with " + (Object)((Object)slotRequestId) + " has been released.")));
            } else {
                AllocatedSlot allocatedSlot = this.allocatedSlots.remove(slotRequestId);
                if (allocatedSlot != null) {
                    allocatedSlot.releasePayload(cause);
                    this.tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
                } else {
                    this.log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", (Object)slotRequestId);
                }
            }
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Nullable
    private PendingRequest removePendingRequest(SlotRequestId requestId) {
        PendingRequest result = this.waitingForResourceManager.remove((Object)requestId);
        if (result != null) {
            assert (!this.pendingRequests.containsKeyA(requestId)) : "A pending requests should only be part of either the pendingRequests or waitingForResourceManager but not both.";
            return result;
        }
        return this.pendingRequests.removeKeyA(requestId);
    }

    private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
        Preconditions.checkNotNull((Object)pendingRequest);
        Preconditions.checkNotNull((Object)e);
        if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
            this.log.info("Failing pending slot request [{}]: {}", (Object)pendingRequest.getSlotRequestId(), (Object)e.getMessage());
            pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
        }
    }

    @Nullable
    private SlotAndLocality pollAndAllocateSlot(SlotRequestId slotRequestId, SlotProfile slotProfile) {
        SlotAndLocality slotFromPool = this.availableSlots.poll(this.schedulingStrategy, slotProfile);
        if (slotFromPool != null) {
            this.allocatedSlots.add(slotRequestId, slotFromPool.getSlot());
        }
        return slotFromPool;
    }

    private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
        Preconditions.checkState((!allocatedSlot.isUsed() ? 1 : 0) != 0, (Object)"Provided slot is still in use.");
        PendingRequest pendingRequest = this.pollMatchingPendingRequest(allocatedSlot);
        if (pendingRequest != null) {
            this.log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]", (Object)pendingRequest.getSlotRequestId(), (Object)allocatedSlot.getAllocationId());
            this.allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
            pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
        } else {
            this.log.debug("Adding returned slot [{}] to available slots", (Object)allocatedSlot.getAllocationId());
            this.availableSlots.add(allocatedSlot, this.clock.relativeTimeMillis());
        }
    }

    private PendingRequest pollMatchingPendingRequest(AllocatedSlot slot) {
        ResourceProfile slotResources = slot.getResourceProfile();
        for (PendingRequest request : this.pendingRequests.values()) {
            if (!slotResources.isMatching(request.getResourceProfile())) continue;
            this.pendingRequests.removeKeyA(request.getSlotRequestId());
            return request;
        }
        for (PendingRequest request : this.waitingForResourceManager.values()) {
            if (!slotResources.isMatching(request.getResourceProfile())) continue;
            this.waitingForResourceManager.remove((Object)request.getSlotRequestId());
            return request;
        }
        return null;
    }

    @Override
    public CompletableFuture<Collection<SlotOffer>> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) {
        this.validateRunsInMainThread();
        List acceptedSlotOffers = offers.stream().map(offer -> {
            CompletionStage acceptedSlotOffer = this.offerSlot(taskManagerLocation, taskManagerGateway, (SlotOffer)offer).thenApply(acceptedSlot -> acceptedSlot != false ? Optional.of(offer) : Optional.empty());
            return acceptedSlotOffer;
        }).collect(Collectors.toList());
        FutureUtils.ConjunctFuture optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers);
        return optionalSlotOffers.thenApply(collection -> collection.stream().flatMap(opt -> opt.map(Stream::of).orElseGet(Stream::empty)).collect(Collectors.toList()));
    }

    @Override
    public CompletableFuture<Boolean> offerSlot(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, SlotOffer slotOffer) {
        this.validateRunsInMainThread();
        ResourceID resourceID = taskManagerLocation.getResourceID();
        AllocationID allocationID = slotOffer.getAllocationId();
        if (!this.registeredTaskManagers.contains(resourceID)) {
            this.log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", (Object)slotOffer.getAllocationId(), (Object)taskManagerLocation);
            return CompletableFuture.completedFuture(false);
        }
        AllocatedSlot existingSlot = this.allocatedSlots.get(allocationID);
        if (existingSlot != null || (existingSlot = this.availableSlots.get(allocationID)) != null) {
            SlotID newSlotId;
            SlotID existingSlotId = existingSlot.getSlotId();
            if (existingSlotId.equals(newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex()))) {
                this.log.info("Received repeated offer for slot [{}]. Ignoring.", (Object)allocationID);
                return CompletableFuture.completedFuture(true);
            }
            return CompletableFuture.completedFuture(false);
        }
        AllocatedSlot allocatedSlot = new AllocatedSlot(allocationID, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway);
        PendingRequest pendingRequest = this.pendingRequests.removeKeyB(allocationID);
        if (pendingRequest != null) {
            this.allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
            if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {
                this.allocatedSlots.remove(pendingRequest.getSlotRequestId());
                this.tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
            } else {
                this.log.debug("Fulfilled slot request [{}] with allocated slot [{}].", (Object)pendingRequest.getSlotRequestId(), (Object)allocationID);
            }
        } else {
            this.tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
        }
        return CompletableFuture.completedFuture(true);
    }

    @Override
    public void failAllocation(AllocationID allocationID, Exception cause) {
        PendingRequest pendingRequest = this.pendingRequests.removeKeyB(allocationID);
        if (pendingRequest != null) {
            this.failPendingRequest(pendingRequest, cause);
        } else if (this.availableSlots.tryRemove(allocationID)) {
            this.log.debug("Failed available slot [{}].", (Object)allocationID, (Object)cause);
        } else {
            AllocatedSlot allocatedSlot = this.allocatedSlots.remove(allocationID);
            if (allocatedSlot != null) {
                allocatedSlot.releasePayload(cause);
            } else {
                this.log.trace("Outdated request to fail slot [{}].", (Object)allocationID, (Object)cause);
            }
        }
    }

    @Override
    public CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID) {
        this.log.debug("Register new TaskExecutor {}.", (Object)resourceID);
        this.registeredTaskManagers.add(resourceID);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override
    public CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceId, Exception cause) {
        if (this.registeredTaskManagers.remove(resourceId)) {
            this.releaseTaskManagerInternal(resourceId, cause);
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @VisibleForTesting
    protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
        this.log.info("Pending slot request [{}] timed out.", (Object)slotRequestId);
        this.removePendingRequest(slotRequestId);
    }

    private void releaseTaskManagerInternal(ResourceID resourceId, Exception cause) {
        HashSet<AllocatedSlot> removedSlots = new HashSet<AllocatedSlot>(this.allocatedSlots.removeSlotsForTaskManager(resourceId));
        for (AllocatedSlot allocatedSlot : removedSlots) {
            allocatedSlot.releasePayload(cause);
        }
        removedSlots.addAll(this.availableSlots.removeAllForTaskManager(resourceId));
        for (AllocatedSlot removedSlot : removedSlots) {
            TaskManagerGateway taskManagerGateway = removedSlot.getTaskManagerGateway();
            taskManagerGateway.freeSlot(removedSlot.getAllocationId(), cause, this.rpcTimeout);
        }
    }

    private void checkIdleSlot() {
        long currentRelativeTimeMillis = this.clock.relativeTimeMillis();
        ArrayList<AllocatedSlot> expiredSlots = new ArrayList<AllocatedSlot>(this.availableSlots.size());
        for (SlotAndTimestamp slotAndTimestamp : this.availableSlots.availableSlots.values()) {
            if (currentRelativeTimeMillis - slotAndTimestamp.timestamp <= this.idleSlotTimeout.toMilliseconds()) continue;
            expiredSlots.add(slotAndTimestamp.slot);
        }
        FlinkException cause = new FlinkException("Releasing idle slot.");
        for (AllocatedSlot expiredSlot : expiredSlots) {
            AllocationID allocationID = expiredSlot.getAllocationId();
            if (!this.availableSlots.tryRemove(allocationID)) continue;
            this.log.info("Releasing idle slot [{}].", (Object)allocationID);
            CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(allocationID, cause, this.rpcTimeout);
            freeSlotFuture.whenCompleteAsync((ignored, throwable) -> {
                if (throwable != null) {
                    if (this.registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
                        this.log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Trying to fulfill a different slot request.", new Object[]{allocationID, expiredSlot.getTaskManagerId(), throwable});
                        this.tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
                    } else {
                        this.log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no longer registered. Discarding slot.", (Object)allocationID, (Object)expiredSlot.getTaskManagerId());
                    }
                }
            }, (Executor)this.getMainThreadExecutor());
        }
        this.scheduleRunAsync(this::checkIdleSlot, this.idleSlotTimeout);
    }

    private void clear() {
        this.availableSlots.clear();
        this.allocatedSlots.clear();
        this.pendingRequests.clear();
        this.waitingForResourceManager.clear();
        this.registeredTaskManagers.clear();
        this.slotSharingManagers.clear();
    }

    private void scheduledLogStatus() {
        this.log.debug(this.printStatus());
        this.scheduleRunAsync(this::scheduledLogStatus, 60000L, TimeUnit.MILLISECONDS);
    }

    private String printStatus() {
        this.validateRunsInMainThread();
        StringBuilder builder = new StringBuilder(1024).append("Slot Pool Status:\n");
        builder.append("\tstatus: ");
        if (this.resourceManagerGateway != null) {
            builder.append("connected to ").append(this.resourceManagerGateway.getAddress()).append('\n');
        } else {
            builder.append("unconnected and waiting for ResourceManager ").append(this.waitingForResourceManager).append('\n');
        }
        builder.append("\tregistered TaskManagers: ").append(this.registeredTaskManagers).append('\n');
        builder.append("\tavailable slots: ").append(this.availableSlots.printAllSlots()).append('\n');
        builder.append("\tallocated slots: ").append(this.allocatedSlots.printAllSlots()).append('\n');
        builder.append("\tpending requests: ").append(this.pendingRequests.values()).append('\n');
        builder.append("\tsharing groups: {\n");
        for (Map.Entry<SlotSharingGroupId, SlotSharingManager> manager : this.slotSharingManagers.entrySet()) {
            builder.append("\t -------- ").append((Object)manager.getKey()).append(" --------\n");
            builder.append(manager.getValue());
        }
        builder.append("\t}\n");
        return builder.toString();
    }

    @VisibleForTesting
    protected AllocatedSlots getAllocatedSlots() {
        return this.allocatedSlots;
    }

    @VisibleForTesting
    protected AvailableSlots getAvailableSlots() {
        return this.availableSlots;
    }

    @VisibleForTesting
    DualKeyMap<SlotRequestId, AllocationID, PendingRequest> getPendingRequests() {
        return this.pendingRequests;
    }

    @VisibleForTesting
    Map<SlotRequestId, PendingRequest> getWaitingForResourceManager() {
        return this.waitingForResourceManager;
    }

    @VisibleForTesting
    void triggerCheckIdleSlot() {
        this.runAsync(this::checkIdleSlot);
    }

    private static class SlotAndTimestamp {
        private final AllocatedSlot slot;
        private final long timestamp;

        SlotAndTimestamp(AllocatedSlot slot, long timestamp) {
            this.slot = slot;
            this.timestamp = timestamp;
        }

        public AllocatedSlot slot() {
            return this.slot;
        }

        public long timestamp() {
            return this.timestamp;
        }

        public String toString() {
            return this.slot + " @ " + this.timestamp;
        }
    }

    private static class PendingRequest {
        private final SlotRequestId slotRequestId;
        private final ResourceProfile resourceProfile;
        private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;

        PendingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
            this.slotRequestId = (SlotRequestId)((Object)Preconditions.checkNotNull((Object)((Object)slotRequestId)));
            this.resourceProfile = (ResourceProfile)Preconditions.checkNotNull((Object)resourceProfile);
            this.allocatedSlotFuture = new CompletableFuture();
        }

        public SlotRequestId getSlotRequestId() {
            return this.slotRequestId;
        }

        public CompletableFuture<AllocatedSlot> getAllocatedSlotFuture() {
            return this.allocatedSlotFuture;
        }

        public ResourceProfile getResourceProfile() {
            return this.resourceProfile;
        }

        public String toString() {
            return "PendingRequest{slotRequestId=" + (Object)((Object)this.slotRequestId) + ", resourceProfile=" + this.resourceProfile + ", allocatedSlotFuture=" + this.allocatedSlotFuture + '}';
        }
    }

    private static class ProviderAndOwner
    implements SlotOwner,
    SlotProvider {
        private final SlotPoolGateway gateway;

        ProviderAndOwner(SlotPoolGateway gateway) {
            this.gateway = gateway;
        }

        @Override
        public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot slot) {
            return this.gateway.releaseSlot(slot.getSlotRequestId(), slot.getSlotSharingGroupId(), new FlinkException("Slot is being returned to the SlotPool.")).thenApply(acknowledge -> true);
        }

        @Override
        public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, SlotProfile slotProfile, Time timeout) {
            CompletableFuture<LogicalSlot> slotFuture = this.gateway.allocateSlot(slotRequestId, task, slotProfile, allowQueued, timeout);
            slotFuture.whenComplete((slot, failure) -> {
                if (failure != null) {
                    this.gateway.releaseSlot(slotRequestId, task.getSlotSharingGroupId(), (Throwable)failure);
                }
            });
            return slotFuture;
        }

        @Override
        public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
            return this.gateway.releaseSlot(slotRequestId, slotSharingGroupId, cause);
        }
    }

    protected static class AvailableSlots {
        private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = new HashMap();
        private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost = new HashMap();
        private final HashMap<AllocationID, SlotAndTimestamp> availableSlots = new HashMap();

        AvailableSlots() {
        }

        void add(AllocatedSlot slot, long timestamp) {
            Set<AllocatedSlot> slotsForHost;
            Preconditions.checkNotNull((Object)slot);
            SlotAndTimestamp previous = this.availableSlots.put(slot.getAllocationId(), new SlotAndTimestamp(slot, timestamp));
            if (previous == null) {
                ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
                String host = slot.getTaskManagerLocation().getFQDNHostname();
                Set<AllocatedSlot> slotsForTaskManager = this.availableSlotsByTaskManager.get(resourceID);
                if (slotsForTaskManager == null) {
                    slotsForTaskManager = new HashSet<AllocatedSlot>();
                    this.availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
                }
                slotsForTaskManager.add(slot);
                slotsForHost = this.availableSlotsByHost.get(host);
                if (slotsForHost == null) {
                    slotsForHost = new HashSet<AllocatedSlot>();
                    this.availableSlotsByHost.put(host, slotsForHost);
                }
            } else {
                throw new IllegalStateException("slot already contained");
            }
            slotsForHost.add(slot);
        }

        boolean contains(AllocationID slotId) {
            return this.availableSlots.containsKey((Object)slotId);
        }

        AllocatedSlot get(AllocationID allocationID) {
            SlotAndTimestamp slotAndTimestamp = this.availableSlots.get((Object)allocationID);
            if (slotAndTimestamp != null) {
                return slotAndTimestamp.slot();
            }
            return null;
        }

        SlotAndLocality poll(SchedulingStrategy schedulingStrategy, SlotProfile slotProfile) {
            if (this.availableSlots.isEmpty()) {
                return null;
            }
            Collection<SlotAndTimestamp> slotAndTimestamps = this.availableSlots.values();
            SlotAndLocality matchingSlotAndLocality = schedulingStrategy.findMatchWithLocality(slotProfile, slotAndTimestamps.stream(), SlotAndTimestamp::slot, slot -> slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()), (slotAndTimestamp, locality) -> {
                AllocatedSlot slot = slotAndTimestamp.slot();
                return new SlotAndLocality(slot, (Locality)((Object)locality));
            });
            if (matchingSlotAndLocality != null) {
                AllocatedSlot slot2 = matchingSlotAndLocality.getSlot();
                this.remove(slot2.getAllocationId());
            }
            return matchingSlotAndLocality;
        }

        Set<AllocatedSlot> removeAllForTaskManager(ResourceID taskManager) {
            Set<AllocatedSlot> slotsForTm = this.availableSlotsByTaskManager.remove(taskManager);
            if (slotsForTm != null && slotsForTm.size() > 0) {
                String host = slotsForTm.iterator().next().getTaskManagerLocation().getFQDNHostname();
                Set<AllocatedSlot> slotsForHost = this.availableSlotsByHost.get(host);
                for (AllocatedSlot slot : slotsForTm) {
                    this.availableSlots.remove((Object)slot.getAllocationId());
                    slotsForHost.remove(slot);
                }
                if (slotsForHost.isEmpty()) {
                    this.availableSlotsByHost.remove(host);
                }
                return slotsForTm;
            }
            return Collections.emptySet();
        }

        boolean tryRemove(AllocationID slotId) {
            SlotAndTimestamp sat = this.availableSlots.remove((Object)slotId);
            if (sat != null) {
                AllocatedSlot slot = sat.slot();
                ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
                String host = slot.getTaskManagerLocation().getFQDNHostname();
                Set<AllocatedSlot> slotsForTm = this.availableSlotsByTaskManager.get(resourceID);
                Set<AllocatedSlot> slotsForHost = this.availableSlotsByHost.get(host);
                slotsForTm.remove(slot);
                slotsForHost.remove(slot);
                if (slotsForTm.isEmpty()) {
                    this.availableSlotsByTaskManager.remove(resourceID);
                }
                if (slotsForHost.isEmpty()) {
                    this.availableSlotsByHost.remove(host);
                }
                return true;
            }
            return false;
        }

        private void remove(AllocationID slotId) throws IllegalStateException {
            if (!this.tryRemove(slotId)) {
                throw new IllegalStateException("slot not contained");
            }
        }

        String printAllSlots() {
            return this.availableSlots.values().toString();
        }

        @VisibleForTesting
        boolean containsTaskManager(ResourceID resourceID) {
            return this.availableSlotsByTaskManager.containsKey(resourceID);
        }

        @VisibleForTesting
        public int size() {
            return this.availableSlots.size();
        }

        @VisibleForTesting
        void clear() {
            this.availableSlots.clear();
            this.availableSlotsByTaskManager.clear();
            this.availableSlotsByHost.clear();
        }
    }

    static class AllocatedSlots {
        private final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager = new HashMap<ResourceID, Set<AllocatedSlot>>(16);
        private final DualKeyMap<AllocationID, SlotRequestId, AllocatedSlot> allocatedSlotsById = new DualKeyMap(16);

        AllocatedSlots() {
        }

        void add(SlotRequestId slotRequestId, AllocatedSlot allocatedSlot) {
            this.allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot);
            ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
            Set slotsForTaskManager = this.allocatedSlotsByTaskManager.computeIfAbsent(resourceID, resourceId -> new HashSet(4));
            slotsForTaskManager.add(allocatedSlot);
        }

        AllocatedSlot get(AllocationID allocationID) {
            return this.allocatedSlotsById.getKeyA(allocationID);
        }

        AllocatedSlot get(SlotRequestId slotRequestId) {
            return this.allocatedSlotsById.getKeyB(slotRequestId);
        }

        boolean contains(AllocationID slotAllocationId) {
            return this.allocatedSlotsById.containsKeyA(slotAllocationId);
        }

        @Nullable
        AllocatedSlot remove(AllocationID allocationID) {
            AllocatedSlot allocatedSlot = this.allocatedSlotsById.removeKeyA(allocationID);
            if (allocatedSlot != null) {
                this.removeAllocatedSlot(allocatedSlot);
            }
            return allocatedSlot;
        }

        @Nullable
        AllocatedSlot remove(SlotRequestId slotRequestId) {
            AllocatedSlot allocatedSlot = this.allocatedSlotsById.removeKeyB(slotRequestId);
            if (allocatedSlot != null) {
                this.removeAllocatedSlot(allocatedSlot);
            }
            return allocatedSlot;
        }

        private void removeAllocatedSlot(AllocatedSlot allocatedSlot) {
            Preconditions.checkNotNull((Object)allocatedSlot);
            ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
            Set<AllocatedSlot> slotsForTM = this.allocatedSlotsByTaskManager.get(taskManagerId);
            slotsForTM.remove(allocatedSlot);
            if (slotsForTM.isEmpty()) {
                this.allocatedSlotsByTaskManager.remove(taskManagerId);
            }
        }

        Set<AllocatedSlot> removeSlotsForTaskManager(ResourceID resourceID) {
            Set<AllocatedSlot> slotsForTaskManager = this.allocatedSlotsByTaskManager.remove(resourceID);
            if (slotsForTaskManager != null) {
                for (AllocatedSlot allocatedSlot : slotsForTaskManager) {
                    this.allocatedSlotsById.removeKeyA(allocatedSlot.getAllocationId());
                }
                return slotsForTaskManager;
            }
            return Collections.emptySet();
        }

        void clear() {
            this.allocatedSlotsById.clear();
            this.allocatedSlotsByTaskManager.clear();
        }

        String printAllSlots() {
            return this.allocatedSlotsByTaskManager.values().toString();
        }

        @VisibleForTesting
        boolean containResource(ResourceID resourceID) {
            return this.allocatedSlotsByTaskManager.containsKey(resourceID);
        }

        @VisibleForTesting
        int size() {
            return this.allocatedSlotsById.size();
        }

        @VisibleForTesting
        Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
            if (this.allocatedSlotsByTaskManager.containsKey(resourceId)) {
                return this.allocatedSlotsByTaskManager.get(resourceId);
            }
            return Collections.emptySet();
        }
    }
}

