/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.active;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager;
import org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.active.TestingResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceDeclaration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.ClassRule;
import org.junit.Test;

public class ActiveResourceManagerTest
extends TestLogger {
    @ClassRule
    public static final TestingRpcServiceResource RPC_SERVICE_RESOURCE = new TestingRpcServiceResource();
    private static final long TIMEOUT_SEC = 5L;
    private static final Time TIMEOUT_TIME = Time.seconds((long)5L);
    private static final Time TESTING_START_WORKER_INTERVAL = Time.milliseconds((long)50L);
    private static final long TESTING_START_WORKER_TIMEOUT_MS = 50L;
    private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = WorkerResourceSpec.ZERO;
    private static final TaskExecutorMemoryConfiguration TESTING_CONFIG = new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(21L), Long.valueOf(36L));

    @Test
    public void testStartNewWorker() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId = ResourceID.generate();
                CompletableFuture requestWorkerFromDriverFuture = new CompletableFuture();
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(tmResourceId);
                });
                this.runTest(() -> {
                    CompletableFuture<Void> startNewWorkerFuture = this.runInMainThread(() -> this.getResourceManager().requestNewWorker(WORKER_RESOURCE_SPEC));
                    TaskExecutorProcessSpec taskExecutorProcessSpec = (TaskExecutorProcessSpec)requestWorkerFromDriverFuture.get(5L, TimeUnit.SECONDS);
                    startNewWorkerFuture.get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)taskExecutorProcessSpec, (Matcher)Matchers.is((Object)TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec((Configuration)this.flinkConfig, (WorkerResourceSpec)WORKER_RESOURCE_SPEC)));
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = this.registerTaskExecutor(tmResourceId);
                    Assert.assertThat((Object)registerTaskExecutorFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testLessThanDeclareResource() throws Exception {
        new Context(){
            {
                AtomicInteger requestCount = new AtomicInteger(0);
                ArrayList resourceIdFutures = new ArrayList();
                resourceIdFutures.add(CompletableFuture.completedFuture(ResourceID.generate()));
                resourceIdFutures.add(new CompletableFuture());
                resourceIdFutures.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> (CompletableFuture)resourceIdFutures.get(requestCount.getAndIncrement()));
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getResourceManager().requestNewWorker(WORKER_RESOURCE_SPEC)).get(5L, TimeUnit.SECONDS);
                    this.runInMainThread(() -> this.getResourceManager().requestNewWorker(WORKER_RESOURCE_SPEC)).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)requestCount.get(), (Matcher)Matchers.is((Object)2));
                    CompletableFuture<Void> declareResourceFuture = this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 3, Collections.emptySet()))));
                    declareResourceFuture.get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)requestCount.get(), (Matcher)Matchers.is((Object)3));
                });
            }
        };
    }

    @Test
    public void testMoreThanDeclaredResource() throws Exception {
        new Context(){
            {
                AtomicInteger requestCount = new AtomicInteger(0);
                List<CompletableFuture> resourceIdFutures = Arrays.asList(CompletableFuture.completedFuture(ResourceID.generate()), CompletableFuture.completedFuture(ResourceID.generate()), CompletableFuture.completedFuture(ResourceID.generate()), new CompletableFuture());
                AtomicInteger releaseCount = new AtomicInteger(0);
                List<CompletableFuture> releaseResourceFutures = Arrays.asList(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> (CompletableFuture)resourceIdFutures.get(requestCount.getAndIncrement())).setReleaseResourceConsumer(resourceID -> ((CompletableFuture)releaseResourceFutures.get(releaseCount.getAndIncrement())).complete(resourceID));
                this.runTest(() -> {
                    this.runInMainThread(() -> {
                        for (int i = 0; i < 4; ++i) {
                            this.getResourceManager().requestNewWorker(WORKER_RESOURCE_SPEC);
                        }
                    });
                    ResourceID unWantedResource = (ResourceID)((CompletableFuture)resourceIdFutures.get(0)).get();
                    ResourceID normalResource = (ResourceID)((CompletableFuture)resourceIdFutures.get(1)).get();
                    ResourceID startingResource = (ResourceID)((CompletableFuture)resourceIdFutures.get(2)).get();
                    CompletableFuture pendingRequestFuture = (CompletableFuture)resourceIdFutures.get(3);
                    this.registerTaskExecutorAndSendSlotReport(unWantedResource, 1).get(5L, TimeUnit.SECONDS);
                    this.registerTaskExecutorAndSendSlotReport(normalResource, 1).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)requestCount.get(), (Matcher)Matchers.is((Object)4));
                    Assert.assertThat((Object)releaseCount.get(), (Matcher)Matchers.is((Object)0));
                    Set unWantedWorkers = Collections.singleton(this.getResourceManager().getInstanceIdByResourceId(unWantedResource).get());
                    this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 3, (Collection)unWantedWorkers)))).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)releaseCount.get(), (Matcher)Matchers.is((Object)1));
                    Assert.assertThat(((CompletableFuture)releaseResourceFutures.get(0)).get(), (Matcher)Matchers.is((Object)unWantedResource));
                    this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 2, Collections.emptySet())))).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)releaseCount.get(), (Matcher)Matchers.is((Object)1));
                    Assert.assertThat((Object)pendingRequestFuture.isCancelled(), (Matcher)Matchers.is((Object)true));
                    this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 1, Collections.emptySet())))).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)releaseCount.get(), (Matcher)Matchers.is((Object)2));
                    Assert.assertThat(((CompletableFuture)releaseResourceFutures.get(1)).get(), (Matcher)Matchers.is((Object)startingResource));
                    this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 0, Collections.emptySet())))).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)releaseCount.get(), (Matcher)Matchers.is((Object)3));
                    Assert.assertThat(((CompletableFuture)releaseResourceFutures.get(2)).get(), (Matcher)Matchers.is((Object)normalResource));
                });
            }
        };
    }

    @Test
    public void testStartNewWorkerFailedRequesting() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId = ResourceID.generate();
                AtomicInteger requestCount = new AtomicInteger(0);
                ArrayList resourceIdFutures = new ArrayList();
                resourceIdFutures.add(new CompletableFuture());
                resourceIdFutures.add(new CompletableFuture());
                ArrayList requestWorkerFromDriverFutures = new ArrayList();
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int idx = requestCount.getAndIncrement();
                    Assert.assertThat((Object)idx, (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(2)));
                    ((CompletableFuture)requestWorkerFromDriverFutures.get(idx)).complete(taskExecutorProcessSpec);
                    return (CompletableFuture)resourceIdFutures.get(idx);
                });
                this.runTest(() -> {
                    CompletableFuture<Void> startNewWorkerFuture = this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 1, Collections.emptySet()))));
                    TaskExecutorProcessSpec taskExecutorProcessSpec1 = (TaskExecutorProcessSpec)((CompletableFuture)requestWorkerFromDriverFutures.get(0)).get(5L, TimeUnit.SECONDS);
                    startNewWorkerFuture.get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)taskExecutorProcessSpec1, (Matcher)Matchers.is((Object)TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec((Configuration)this.flinkConfig, (WorkerResourceSpec)WORKER_RESOURCE_SPEC)));
                    this.runInMainThread(() -> ((CompletableFuture)resourceIdFutures.get(0)).completeExceptionally(new Throwable("testing error")));
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec)((CompletableFuture)requestWorkerFromDriverFutures.get(1)).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)taskExecutorProcessSpec2, (Matcher)Matchers.is((Object)taskExecutorProcessSpec1));
                    this.runInMainThread(() -> ((CompletableFuture)resourceIdFutures.get(1)).complete(tmResourceId));
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = this.registerTaskExecutor(tmResourceId);
                    Assert.assertThat((Object)registerTaskExecutorFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testWorkerTerminatedBeforeRegister() throws Exception {
        new Context(){
            {
                AtomicInteger requestCount = new AtomicInteger(0);
                ArrayList<ResourceID> tmResourceIds = new ArrayList<ResourceID>();
                tmResourceIds.add(ResourceID.generate());
                tmResourceIds.add(ResourceID.generate());
                ArrayList requestWorkerFromDriverFutures = new ArrayList();
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int idx = requestCount.getAndIncrement();
                    Assert.assertThat((Object)idx, (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(2)));
                    ((CompletableFuture)requestWorkerFromDriverFutures.get(idx)).complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(tmResourceIds.get(idx));
                });
                this.runTest(() -> {
                    CompletableFuture<Void> startNewWorkerFuture = this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 1, Collections.emptySet()))));
                    TaskExecutorProcessSpec taskExecutorProcessSpec1 = (TaskExecutorProcessSpec)((CompletableFuture)requestWorkerFromDriverFutures.get(0)).get(5L, TimeUnit.SECONDS);
                    startNewWorkerFuture.get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)taskExecutorProcessSpec1, (Matcher)Matchers.is((Object)TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec((Configuration)this.flinkConfig, (WorkerResourceSpec)WORKER_RESOURCE_SPEC)));
                    this.runInMainThread(() -> this.getResourceManager().onWorkerTerminated((ResourceID)tmResourceIds.get(0), "terminate for testing"));
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec)((CompletableFuture)requestWorkerFromDriverFutures.get(1)).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)taskExecutorProcessSpec2, (Matcher)Matchers.is((Object)taskExecutorProcessSpec1));
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = this.registerTaskExecutor((ResourceID)tmResourceIds.get(1));
                    Assert.assertThat((Object)registerTaskExecutorFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testWorkerTerminatedAfterRegister() throws Exception {
        new Context(){
            {
                AtomicInteger requestCount = new AtomicInteger(0);
                ArrayList<ResourceID> tmResourceIds = new ArrayList<ResourceID>();
                tmResourceIds.add(ResourceID.generate());
                tmResourceIds.add(ResourceID.generate());
                ArrayList requestWorkerFromDriverFutures = new ArrayList();
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int idx = requestCount.getAndIncrement();
                    Assert.assertThat((Object)idx, (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(2)));
                    ((CompletableFuture)requestWorkerFromDriverFutures.get(idx)).complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(tmResourceIds.get(idx));
                });
                this.runTest(() -> {
                    CompletableFuture<Void> startNewWorkerFuture = this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 1, Collections.emptySet()))));
                    TaskExecutorProcessSpec taskExecutorProcessSpec1 = (TaskExecutorProcessSpec)((CompletableFuture)requestWorkerFromDriverFutures.get(0)).get(5L, TimeUnit.SECONDS);
                    startNewWorkerFuture.get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)taskExecutorProcessSpec1, (Matcher)Matchers.is((Object)TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec((Configuration)this.flinkConfig, (WorkerResourceSpec)WORKER_RESOURCE_SPEC)));
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture1 = this.registerTaskExecutor((ResourceID)tmResourceIds.get(0));
                    Assert.assertThat((Object)registerTaskExecutorFuture1.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                    this.runInMainThread(() -> this.getResourceManager().onWorkerTerminated((ResourceID)tmResourceIds.get(0), "terminate for testing"));
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec)((CompletableFuture)requestWorkerFromDriverFutures.get(1)).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)taskExecutorProcessSpec2, (Matcher)Matchers.is((Object)taskExecutorProcessSpec1));
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture2 = this.registerTaskExecutor((ResourceID)tmResourceIds.get(1));
                    Assert.assertThat((Object)registerTaskExecutorFuture2.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testWorkerTerminatedNoLongerRequired() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId = ResourceID.generate();
                AtomicInteger requestCount = new AtomicInteger(0);
                ArrayList requestWorkerFromDriverFutures = new ArrayList();
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int idx = requestCount.getAndIncrement();
                    Assert.assertThat((Object)idx, (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(2)));
                    ((CompletableFuture)requestWorkerFromDriverFutures.get(idx)).complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(tmResourceId);
                });
                this.runTest(() -> {
                    CompletableFuture<Void> startNewWorkerFuture = this.runInMainThread(() -> this.getResourceManager().requestNewWorker(WORKER_RESOURCE_SPEC));
                    TaskExecutorProcessSpec taskExecutorProcessSpec = (TaskExecutorProcessSpec)((CompletableFuture)requestWorkerFromDriverFutures.get(0)).get(5L, TimeUnit.SECONDS);
                    startNewWorkerFuture.get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)taskExecutorProcessSpec, (Matcher)Matchers.is((Object)TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec((Configuration)this.flinkConfig, (WorkerResourceSpec)WORKER_RESOURCE_SPEC)));
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = this.registerTaskExecutor(tmResourceId);
                    Assert.assertThat((Object)registerTaskExecutorFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                    this.runInMainThread(() -> {
                        this.getResourceManager().onWorkerTerminated(tmResourceId, "terminate for testing");
                        return null;
                    }).get(5L, TimeUnit.SECONDS);
                    Assert.assertFalse((boolean)((CompletableFuture)requestWorkerFromDriverFutures.get(1)).isDone());
                });
            }
        };
    }

    @Test
    public void testCloseTaskManagerConnectionOnWorkerTerminated() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId = ResourceID.generate();
                CompletableFuture requestWorkerFromDriverFuture = new CompletableFuture();
                CompletableFuture disconnectResourceManagerFuture = new CompletableFuture();
                TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setDisconnectResourceManagerConsumer(ignore -> disconnectResourceManagerFuture.complete(null)).createTestingTaskExecutorGateway();
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(tmResourceId);
                });
                this.runTest(() -> {
                    ((CompletableFuture)this.runInMainThread(() -> this.getResourceManager().requestNewWorker(WORKER_RESOURCE_SPEC)).thenCompose(ignore -> this.registerTaskExecutor(tmResourceId, taskExecutorGateway))).thenRun(() -> this.runInMainThread(() -> this.getResourceManager().onWorkerTerminated(tmResourceId, "terminate for testing")));
                    disconnectResourceManagerFuture.get(5L, TimeUnit.SECONDS);
                });
            }
        };
    }

    @Test
    public void testStartWorkerIntervalOnWorkerTerminationExceedFailureRate() throws Exception {
        new Context(){
            {
                this.flinkConfig.setDouble(ResourceManagerOptions.START_WORKER_MAX_FAILURE_RATE, 1.0);
                this.flinkConfig.set(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL, (Object)Duration.ofMillis(TESTING_START_WORKER_INTERVAL.toMilliseconds()));
                AtomicInteger requestCount = new AtomicInteger(0);
                ArrayList<ResourceID> tmResourceIds = new ArrayList<ResourceID>();
                tmResourceIds.add(ResourceID.generate());
                tmResourceIds.add(ResourceID.generate());
                ArrayList requestWorkerFromDriverFutures = new ArrayList();
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int idx = requestCount.getAndIncrement();
                    Assert.assertThat((Object)idx, (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(2)));
                    ((CompletableFuture)requestWorkerFromDriverFutures.get(idx)).complete(System.currentTimeMillis());
                    return CompletableFuture.completedFuture(tmResourceIds.get(idx));
                });
                this.runTest(() -> {
                    CompletableFuture<Void> startNewWorkerFuture = this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 1, Collections.emptySet()))));
                    long t1 = (Long)((CompletableFuture)requestWorkerFromDriverFutures.get(0)).get(5L, TimeUnit.SECONDS);
                    startNewWorkerFuture.get(5L, TimeUnit.SECONDS);
                    this.runInMainThread(() -> this.getResourceManager().onWorkerTerminated((ResourceID)tmResourceIds.get(0), "terminate for testing"));
                    long t2 = (Long)((CompletableFuture)requestWorkerFromDriverFutures.get(1)).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)(t2 - t1), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(TESTING_START_WORKER_INTERVAL.toMilliseconds())));
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = this.registerTaskExecutor((ResourceID)tmResourceIds.get(1));
                    Assert.assertThat((Object)registerTaskExecutorFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testStartWorkerIntervalOnRequestWorkerFailure() throws Exception {
        new Context(){
            {
                this.flinkConfig.setDouble(ResourceManagerOptions.START_WORKER_MAX_FAILURE_RATE, 1.0);
                this.flinkConfig.set(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL, (Object)Duration.ofMillis(TESTING_START_WORKER_INTERVAL.toMilliseconds()));
                AtomicInteger requestCount = new AtomicInteger(0);
                ResourceID tmResourceId = ResourceID.generate();
                ArrayList resourceIdFutures = new ArrayList();
                resourceIdFutures.add(new CompletableFuture());
                resourceIdFutures.add(new CompletableFuture());
                ArrayList requestWorkerFromDriverFutures = new ArrayList();
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                requestWorkerFromDriverFutures.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int idx = requestCount.getAndIncrement();
                    Assert.assertThat((Object)idx, (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(2)));
                    ((CompletableFuture)requestWorkerFromDriverFutures.get(idx)).complete(System.currentTimeMillis());
                    return (CompletableFuture)resourceIdFutures.get(idx);
                });
                this.runTest(() -> {
                    CompletableFuture<Void> startNewWorkerFuture = this.runInMainThread(() -> this.getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(WORKER_RESOURCE_SPEC, 1, Collections.emptySet()))));
                    startNewWorkerFuture.get(5L, TimeUnit.SECONDS);
                    long t1 = (Long)((CompletableFuture)requestWorkerFromDriverFutures.get(0)).get(5L, TimeUnit.SECONDS);
                    this.runInMainThread(() -> ((CompletableFuture)resourceIdFutures.get(0)).completeExceptionally(new Throwable("testing error")));
                    long t2 = (Long)((CompletableFuture)requestWorkerFromDriverFutures.get(1)).get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)(t2 - t1), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(TESTING_START_WORKER_INTERVAL.toMilliseconds())));
                    ((CompletableFuture)resourceIdFutures.get(1)).complete(tmResourceId);
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = this.registerTaskExecutor(tmResourceId);
                    Assert.assertThat((Object)registerTaskExecutorFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testRecoverWorkerFromPreviousAttempt() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId = ResourceID.generate();
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(tmResourceId)));
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = this.registerTaskExecutor(tmResourceId);
                    Assert.assertThat((Object)registerTaskExecutorFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                });
            }
        };
    }

    @Test
    public void testRegisterUnknownWorker() throws Exception {
        new Context(){
            {
                this.runTest(() -> {
                    CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = this.registerTaskExecutor(ResourceID.generate());
                    Assert.assertThat((Object)registerTaskExecutorFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.instanceOf(RegistrationResponse.Rejection.class));
                });
            }
        };
    }

    @Test
    public void testOnError() throws Exception {
        new Context(){
            {
                Throwable fatalError = new Throwable("Testing fatal error");
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getResourceManager().onError(fatalError));
                    Throwable reportedError = this.getFatalErrorHandler().getErrorFuture().get(5L, TimeUnit.SECONDS);
                    Assert.assertThat((Object)reportedError, (Matcher)Matchers.is((Object)fatalError));
                });
            }
        };
    }

    @Test
    public void testWorkerRegistrationTimeout() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId = ResourceID.generate();
                CompletableFuture releaseResourceFuture = new CompletableFuture();
                this.flinkConfig.set(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT, (Object)Duration.ofMillis(50L));
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> CompletableFuture.completedFuture(tmResourceId)).setReleaseResourceConsumer(releaseResourceFuture::complete);
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getResourceManager().requestNewWorker(WORKER_RESOURCE_SPEC));
                    Assert.assertThat(releaseResourceFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.is((Object)tmResourceId));
                });
            }
        };
    }

    @Test
    public void testWorkerRegistrationTimeoutNotCountingAllocationTime() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId = ResourceID.generate();
                CompletableFuture requestResourceFuture = new CompletableFuture();
                CompletableFuture releaseResourceFuture = new CompletableFuture();
                this.flinkConfig.set(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT, (Object)Duration.ofMillis(50L));
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> requestResourceFuture).setReleaseResourceConsumer(releaseResourceFuture::complete);
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getResourceManager().requestNewWorker(WORKER_RESOURCE_SPEC));
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException e) {
                        Assert.fail();
                    }
                    long start = System.nanoTime();
                    this.runInMainThread(() -> requestResourceFuture.complete(tmResourceId));
                    RegistrationResponse registrationResponse = this.registerTaskExecutor(tmResourceId).join();
                    long registrationTime = (System.nanoTime() - start) / 1000000L;
                    Assume.assumeTrue((String)"The registration must not take longer than the start worker timeout. If it does, then this indicates a very slow machine.", (registrationTime < 50L ? 1 : 0) != 0);
                    Assert.assertThat((Object)registrationResponse, (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                    Assert.assertFalse((boolean)releaseResourceFuture.isDone());
                });
            }
        };
    }

    @Test
    public void testWorkerRegistrationTimeoutRecoveredFromPreviousAttempt() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId = ResourceID.generate();
                CompletableFuture releaseResourceFuture = new CompletableFuture();
                this.flinkConfig.set(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT, (Object)Duration.ofMillis(50L));
                this.driverBuilder.setReleaseResourceConsumer(releaseResourceFuture::complete);
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(tmResourceId)));
                    Assert.assertThat(releaseResourceFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.is((Object)tmResourceId));
                });
            }
        };
    }

    @Test
    public void testResourceManagerRecoveredAfterAllTMRegistered() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId1 = ResourceID.generate();
                ResourceID tmResourceId2 = ResourceID.generate();
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getResourceManager().onPreviousAttemptWorkersRecovered((Collection)ImmutableSet.of((Object)tmResourceId1, (Object)tmResourceId2)));
                    this.runInMainThread(() -> this.getResourceManager().onWorkerRegistered((ResourceIDRetrievable)tmResourceId1, WorkerResourceSpec.ZERO));
                    this.runInMainThread(() -> this.getResourceManager().onWorkerRegistered((ResourceIDRetrievable)tmResourceId2, WorkerResourceSpec.ZERO));
                    this.runInMainThread(() -> Assert.assertTrue((boolean)this.getResourceManager().getReadyToServeFuture().isDone())).get(5L, TimeUnit.SECONDS);
                });
            }
        };
    }

    @Test
    public void testResourceManagerRecoveredAfterReconcileTimeout() throws Exception {
        new Context(){
            {
                ResourceID tmResourceId1 = ResourceID.generate();
                ResourceID tmResourceId2 = ResourceID.generate();
                this.flinkConfig.set(ResourceManagerOptions.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT, (Object)Duration.ofMillis(50L));
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getResourceManager().onPreviousAttemptWorkersRecovered((Collection)ImmutableSet.of((Object)tmResourceId1, (Object)tmResourceId2)));
                    this.runInMainThread(() -> this.getResourceManager().onWorkerRegistered((ResourceIDRetrievable)tmResourceId1, WorkerResourceSpec.ZERO));
                    this.getResourceManager().getReadyToServeFuture().get(5L, TimeUnit.SECONDS);
                });
            }
        };
    }

    private static class Context {
        final Configuration flinkConfig = new Configuration();
        final TestingResourceManagerDriver.Builder driverBuilder = new TestingResourceManagerDriver.Builder();
        final TestingSlotManagerBuilder slotManagerBuilder = new TestingSlotManagerBuilder();
        private ActiveResourceManager<ResourceID> resourceManager;
        private TestingFatalErrorHandler fatalErrorHandler;

        private Context() {
        }

        ActiveResourceManager<ResourceID> getResourceManager() {
            return this.resourceManager;
        }

        TestingFatalErrorHandler getFatalErrorHandler() {
            return this.fatalErrorHandler;
        }

        void runTest(RunnableWithException testMethod) throws Exception {
            this.fatalErrorHandler = new TestingFatalErrorHandler();
            this.resourceManager = this.createAndStartResourceManager(this.flinkConfig, this.driverBuilder.build(), this.slotManagerBuilder.createSlotManager());
            try {
                testMethod.run();
            }
            finally {
                this.resourceManager.close();
            }
        }

        private ActiveResourceManager<ResourceID> createAndStartResourceManager(Configuration configuration, ResourceManagerDriver<ResourceID> driver, SlotManager slotManager) throws Exception {
            TestingRpcService rpcService = RPC_SERVICE_RESOURCE.getTestingRpcService();
            MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(rpcService, slotManager);
            Duration retryInterval = (Duration)configuration.get(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL);
            Duration workerRegistrationTimeout = (Duration)configuration.get(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT);
            Duration previousWorkerRecoverTimeout = (Duration)configuration.get(ResourceManagerOptions.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT);
            ActiveResourceManager activeResourceManager = new ActiveResourceManager(driver, configuration, (RpcService)rpcService, UUID.randomUUID(), ResourceID.generate(), rmServices.heartbeatServices, rmServices.delegationTokenManager, rmServices.slotManager, NoOpResourceManagerPartitionTracker::get, (BlocklistHandler.Factory)new NoOpBlocklistHandler.Factory(), rmServices.jobLeaderIdService, new ClusterInformation("localhost", 1234), (FatalErrorHandler)this.fatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), ActiveResourceManagerFactory.createStartWorkerFailureRater((Configuration)configuration), retryInterval, workerRegistrationTimeout, previousWorkerRecoverTimeout, (Executor)ForkJoinPool.commonPool());
            activeResourceManager.start();
            activeResourceManager.getStartedFuture().get(TIMEOUT_TIME.getSize(), TIMEOUT_TIME.getUnit());
            return activeResourceManager;
        }

        CompletableFuture<Void> runInMainThread(Runnable runnable) {
            return this.resourceManager.runInMainThread(() -> {
                runnable.run();
                return null;
            }, TIMEOUT_TIME);
        }

        <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
            return this.resourceManager.runInMainThread(callable, TIMEOUT_TIME);
        }

        CompletableFuture<Acknowledge> registerTaskExecutorAndSendSlotReport(ResourceID resourceID, int slotNumber) {
            return this.registerTaskExecutor(resourceID).thenCompose(response -> {
                Assert.assertThat((Object)response, (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
                InstanceID instanceID = (InstanceID)this.resourceManager.getInstanceIdByResourceId(resourceID).get();
                HashSet<SlotStatus> slots = new HashSet<SlotStatus>();
                for (int i = 0; i < slotNumber; ++i) {
                    slots.add(new SlotStatus(new SlotID(resourceID, i), ResourceProfile.ANY));
                }
                SlotReport slotReport = new SlotReport(slots);
                return ((ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class)).sendSlotReport(resourceID, instanceID, slotReport, TIMEOUT_TIME);
            });
        }

        CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceID resourceID) {
            TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
            return this.registerTaskExecutor(resourceID, taskExecutorGateway);
        }

        CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
            RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(resourceID.toString(), (RpcGateway)taskExecutorGateway);
            TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(resourceID.toString(), resourceID, 1234, 23456, new HardwareDescription(1, 2L, 3L, 4L), TESTING_CONFIG, ResourceProfile.ZERO, ResourceProfile.ZERO, resourceID.toString());
            return ((ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class)).registerTaskExecutor(taskExecutorRegistration, TIMEOUT_TIME);
        }
    }
}

