/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.dispatcher.AbstractDispatcherTest;
import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.TestingJobMasterServiceLeadershipRunnerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
import org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class DispatcherResourceCleanupTest
extends TestLogger {
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
    private static final Time timeout = Time.seconds((long)10L);
    private static TestingRpcService rpcService;
    private JobID jobId;
    private JobGraph jobGraph;
    private TestingDispatcher dispatcher;
    private DispatcherGateway dispatcherGateway;
    private BlobServer blobServer;
    private CompletableFuture<JobID> localCleanupFuture;
    private CompletableFuture<JobID> globalCleanupFuture;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        this.jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        this.jobId = this.jobGraph.getJobID();
        this.globalCleanupFuture = new CompletableFuture();
        this.localCleanupFuture = new CompletableFuture();
        this.blobServer = BlobUtils.createBlobServer((Configuration)new Configuration(), (Reference)Reference.owned((Object)temporaryFolder.newFolder()), (BlobStore)new TestingBlobStoreBuilder().createTestingBlobStore());
    }

    private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob() throws Exception {
        return this.startDispatcherAndSubmitJob(0);
    }

    private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(int numBlockingJobManagerRunners) throws Exception {
        return this.startDispatcherAndSubmitJob(this.createTestingDispatcherBuilder(), numBlockingJobManagerRunners);
    }

    private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(TestingDispatcher.Builder dispatcherBuilder, int numBlockingJobManagerRunners) throws Exception {
        TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactoryNG = new TestingJobMasterServiceLeadershipRunnerFactory(numBlockingJobManagerRunners);
        this.startDispatcher(dispatcherBuilder, testingJobManagerRunnerFactoryNG);
        this.submitJobAndWait();
        return testingJobManagerRunnerFactoryNG;
    }

    private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        this.startDispatcher(this.createTestingDispatcherBuilder(), jobManagerRunnerFactory);
    }

    private void startDispatcher(TestingDispatcher.Builder dispatcherBuilder, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        this.dispatcher = dispatcherBuilder.setJobManagerRunnerFactory(jobManagerRunnerFactory).build(rpcService);
        this.dispatcher.start();
        this.dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
    }

    private TestingDispatcher.Builder createTestingDispatcherBuilder() {
        DefaultJobManagerRunnerRegistry jobManagerRunnerRegistry = new DefaultJobManagerRunnerRegistry(2);
        return TestingDispatcher.builder().setBlobServer(this.blobServer).setJobManagerRunnerRegistry((JobManagerRunnerRegistry)jobManagerRunnerRegistry).setFatalErrorHandler(this.testingFatalErrorHandlerResource.getFatalErrorHandler()).setResourceCleanerFactory(TestingResourceCleanerFactory.builder().withLocallyCleanableResource((LocallyCleanableResource)jobManagerRunnerRegistry).withGloballyCleanableResource((jobId, ignoredExecutor) -> {
            this.globalCleanupFuture.complete(jobId);
            return FutureUtils.completedVoidFuture();
        }).withLocallyCleanableResource((jobId, ignoredExecutor) -> {
            this.localCleanupFuture.complete(jobId);
            return FutureUtils.completedVoidFuture();
        }).build());
    }

    @After
    public void teardown() throws Exception {
        if (this.dispatcher != null) {
            this.dispatcher.close();
        }
        if (this.blobServer != null) {
            this.blobServer.close();
        }
    }

    @AfterClass
    public static void teardownClass() throws ExecutionException, InterruptedException {
        if (rpcService != null) {
            rpcService.closeAsync().get();
        }
    }

    @Test
    public void testGlobalCleanupWhenJobFinished() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        this.finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
        this.assertGlobalCleanupTriggered(this.jobId);
    }

    @Test
    public void testGlobalCleanupWhenJobCanceled() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        this.cancelJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
        this.assertGlobalCleanupTriggered(this.jobId);
    }

    private CompletableFuture<Acknowledge> submitJob() {
        return this.dispatcherGateway.submitJob(this.jobGraph, timeout);
    }

    private void submitJobAndWait() {
        this.submitJob().join();
    }

    @Test
    public void testLocalCleanupWhenJobNotFinished() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        this.suspendJob(testingJobManagerRunner);
        this.assertLocalCleanupTriggered(this.jobId);
    }

    @Test
    public void testGlobalCleanupWhenJobSubmissionFails() throws Exception {
        this.startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception")));
        CompletableFuture<Acknowledge> submissionFuture = this.submitJob();
        try {
            submissionFuture.get();
            Assert.fail((String)"Job submission was expected to fail.");
        }
        catch (ExecutionException ee) {
            MatcherAssert.assertThat((Object)ee, (Matcher)FlinkMatchers.containsCause(JobSubmissionException.class));
        }
        this.assertGlobalCleanupTriggered(this.jobId);
    }

    @Test
    public void testLocalCleanupWhenClosingDispatcher() throws Exception {
        this.startDispatcherAndSubmitJob();
        this.dispatcher.closeAsync().get();
        this.assertLocalCleanupTriggered(this.jobId);
    }

    @Test
    public void testGlobalCleanupWhenJobFinishedWhileClosingDispatcher() throws Exception {
        TestingJobManagerRunner testingJobManagerRunner = TestingJobManagerRunner.newBuilder().setBlockingTermination(true).setJobId(this.jobId).build();
        ArrayDeque<TestingJobManagerRunner> jobManagerRunners = new ArrayDeque<TestingJobManagerRunner>(Arrays.asList(testingJobManagerRunner));
        this.startDispatcher(new QueueJobManagerRunnerFactory(jobManagerRunners));
        this.submitJobAndWait();
        CompletableFuture dispatcherTerminationFuture = this.dispatcher.closeAsync();
        testingJobManagerRunner.getCloseAsyncCalledLatch().await();
        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.FINISHED).build()));
        testingJobManagerRunner.completeTerminationFuture();
        dispatcherTerminationFuture.get();
        this.assertGlobalCleanupTriggered(this.jobId);
    }

    @Test
    public void testJobBeingMarkedAsDirtyBeforeCleanup() throws Exception {
        OneShotLatch markAsDirtyLatch = new OneShotLatch();
        TestingDispatcher.Builder dispatcherBuilder = this.createTestingDispatcherBuilder().setJobResultStore(TestingJobResultStore.builder().withCreateDirtyResultConsumer((ThrowingConsumer<JobResultEntry, ? extends IOException>)((ThrowingConsumer)ignoredJobResultEntry -> {
            try {
                markAsDirtyLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        })).build());
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(dispatcherBuilder, 0);
        this.finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
        this.assertThatNoCleanupWasTriggered();
        markAsDirtyLatch.trigger();
        this.assertGlobalCleanupTriggered(this.jobId);
    }

    @Test
    public void testJobBeingMarkedAsCleanAfterCleanup() throws Exception {
        CompletableFuture markAsCleanFuture = new CompletableFuture();
        TestingJobResultStore jobResultStore = TestingJobResultStore.builder().withMarkResultAsCleanConsumer((ThrowingConsumer<JobID, ? extends IOException>)((ThrowingConsumer)markAsCleanFuture::complete)).build();
        OneShotLatch localCleanupLatch = new OneShotLatch();
        OneShotLatch globalCleanupLatch = new OneShotLatch();
        TestingResourceCleanerFactory resourceCleanerFactory = TestingResourceCleanerFactory.builder().withLocallyCleanableResource((ignoredJobId, ignoredExecutor) -> {
            try {
                localCleanupLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return FutureUtils.completedVoidFuture();
        }).withGloballyCleanableResource((ignoredJobId, ignoredExecutor) -> {
            try {
                globalCleanupLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return FutureUtils.completedVoidFuture();
        }).build();
        TestingDispatcher.Builder dispatcherBuilder = this.createTestingDispatcherBuilder().setJobResultStore(jobResultStore).setResourceCleanerFactory(resourceCleanerFactory);
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(dispatcherBuilder, 0);
        this.finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
        MatcherAssert.assertThat((Object)markAsCleanFuture.isDone(), (Matcher)Matchers.is((Object)false));
        localCleanupLatch.trigger();
        MatcherAssert.assertThat((Object)markAsCleanFuture.isDone(), (Matcher)Matchers.is((Object)false));
        globalCleanupLatch.trigger();
        MatcherAssert.assertThat(markAsCleanFuture.get(), (Matcher)Matchers.is((Object)this.jobId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobSubmissionUnderSameJobId() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(1);
        TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        this.suspendJob(testingJobManagerRunner);
        testingJobManagerRunner.getCloseAsyncCalledLatch().await();
        CompletableFuture submissionFuture = this.dispatcherGateway.submitJob(this.jobGraph, timeout);
        try {
            submissionFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"The job submission future should not complete until the previous JobManager termination future has been completed.");
        }
        catch (TimeoutException timeoutException) {
        }
        finally {
            testingJobManagerRunner.completeTerminationFuture();
        }
        MatcherAssert.assertThat(submissionFuture.get(), (Matcher)Matchers.equalTo((Object)Acknowledge.get()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exception {
        TestingJobManagerRunnerFactory testingJobManagerRunnerFactoryNG = this.startDispatcherAndSubmitJob();
        CompletableFuture submissionFuture = this.dispatcherGateway.submitJob(this.jobGraph, timeout);
        try {
            try {
                submissionFuture.get();
                Assert.fail((String)"Expected a DuplicateJobSubmissionFailure.");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)ee, DuplicateJobSubmissionException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
            this.assertThatNoCleanupWasTriggered();
        }
        finally {
            this.finishJob(testingJobManagerRunnerFactoryNG.takeCreatedJobManagerRunner());
        }
        this.assertGlobalCleanupTriggered(this.jobId);
    }

    private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
        this.terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.FINISHED);
    }

    private void suspendJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
        this.terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.SUSPENDED);
    }

    private void cancelJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
        this.terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.CANCELED);
    }

    private void terminateJobWithState(TestingJobManagerRunner takeCreatedJobManagerRunner, JobStatus state) {
        takeCreatedJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(state).build()));
    }

    private void assertThatNoCleanupWasTriggered() {
        MatcherAssert.assertThat((Object)this.globalCleanupFuture.isDone(), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)this.localCleanupFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testDispatcherTerminationTerminatesRunningJobMasters() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        this.dispatcher.closeAsync().get();
        TestingJobManagerRunner jobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        MatcherAssert.assertThat((Object)jobManagerRunner.getTerminationFuture().isDone(), (Matcher)Matchers.is((Object)true));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDispatcherTerminationWaitsForJobMasterTerminations() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(1);
        CompletableFuture dispatcherTerminationFuture = this.dispatcher.closeAsync();
        try {
            dispatcherTerminationFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should not terminate before all running JobMasters have terminated.");
        }
        catch (TimeoutException timeoutException) {
        }
        finally {
            jobManagerRunnerFactory.takeCreatedJobManagerRunner().completeTerminationFuture();
        }
        dispatcherTerminationFuture.get();
    }

    private void assertLocalCleanupTriggered(JobID jobId) throws ExecutionException, InterruptedException, TimeoutException {
        MatcherAssert.assertThat((Object)this.localCleanupFuture.get(), (Matcher)Matchers.equalTo((Object)jobId));
        MatcherAssert.assertThat((Object)this.globalCleanupFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    private void assertGlobalCleanupTriggered(JobID jobId) throws ExecutionException, InterruptedException, TimeoutException {
        MatcherAssert.assertThat((Object)this.localCleanupFuture.isDone(), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)this.globalCleanupFuture.get(), (Matcher)Matchers.equalTo((Object)jobId));
    }

    @Test
    public void testFatalErrorIfJobCannotBeMarkedDirtyInJobResultStore() throws Exception {
        TestingJobResultStore jobResultStore = TestingJobResultStore.builder().withCreateDirtyResultConsumer((ThrowingConsumer<JobResultEntry, ? extends IOException>)((ThrowingConsumer)jobResult -> {
            throw new IOException("Expected IOException.");
        })).build();
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(this.createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0);
        ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.FINISHED).build();
        TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
        CompletableFuture<Throwable> errorFuture = this.testingFatalErrorHandlerResource.getFatalErrorHandler().getErrorFuture();
        MatcherAssert.assertThat((Object)errorFuture.get(100L, TimeUnit.MILLISECONDS), (Matcher)IsInstanceOf.instanceOf(FlinkException.class));
        this.testingFatalErrorHandlerResource.getFatalErrorHandler().clearError();
    }

    @Test
    public void testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws Exception {
        CompletableFuture dirtyJobFuture = new CompletableFuture();
        TestingJobResultStore jobResultStore = TestingJobResultStore.builder().withCreateDirtyResultConsumer((ThrowingConsumer<JobResultEntry, ? extends IOException>)((ThrowingConsumer)dirtyJobFuture::complete)).withMarkResultAsCleanConsumer((ThrowingConsumer<JobID, ? extends IOException>)((ThrowingConsumer)jobId -> {
            throw new IOException("Expected IOException.");
        })).build();
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(this.createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0);
        ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.FINISHED).build();
        TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
        CompletableFuture<Throwable> errorFuture = this.testingFatalErrorHandlerResource.getFatalErrorHandler().getErrorFuture();
        try {
            Throwable unexpectedError = errorFuture.get(100L, TimeUnit.MILLISECONDS);
            Assert.fail((String)("No error should have been reported but an " + unexpectedError.getClass() + " was handled."));
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        MatcherAssert.assertThat((Object)((JobResultEntry)dirtyJobFuture.get()).getJobId(), (Matcher)Matchers.is((Object)this.jobId));
    }

    @Test
    public void testFailingJobManagerRunnerCleanup() throws Exception {
        FlinkException testException = new FlinkException("Test exception.");
        ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
        BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory = new BlockingJobManagerRunnerFactory((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            Optional maybeException = (Optional)queue.take();
            if (maybeException.isPresent()) {
                throw (Exception)maybeException.get();
            }
        }));
        this.startDispatcher(blockingJobManagerRunnerFactory);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        queue.offer(Optional.of(testException));
        try {
            dispatcherGateway.submitJob(this.jobGraph, Time.minutes((long)1L)).get();
            Assert.fail((String)"A FlinkException is expected");
        }
        catch (Throwable expectedException) {
            MatcherAssert.assertThat((Object)expectedException, (Matcher)FlinkMatchers.containsCause(FlinkException.class));
            MatcherAssert.assertThat((Object)expectedException, (Matcher)FlinkMatchers.containsMessage((String)testException.getMessage()));
            this.assertGlobalCleanupTriggered(this.jobId);
        }
        queue.offer(Optional.empty());
        dispatcherGateway.submitJob(this.jobGraph, Time.minutes((long)1L)).get();
        blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING);
        AbstractDispatcherTest.awaitStatus(dispatcherGateway, this.jobId, JobStatus.RUNNING);
    }

    @Test
    public void testArchivingFinishedJobToHistoryServer() throws Exception {
        CompletableFuture<Acknowledge> archiveFuture = new CompletableFuture<Acknowledge>();
        TestingDispatcher.Builder testingDispatcherBuilder = this.createTestingDispatcherBuilder().setHistoryServerArchivist(executionGraphInfo -> archiveFuture);
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(testingDispatcherBuilder, 0);
        this.finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
        this.assertThatNoCleanupWasTriggered();
        CompletableFuture<Void> jobTerminationFuture = this.dispatcher.getJobTerminationFuture(this.jobId, Time.hours((long)1L));
        Assert.assertFalse((boolean)jobTerminationFuture.isDone());
        archiveFuture.complete(Acknowledge.get());
        this.assertGlobalCleanupTriggered(this.jobId);
        jobTerminationFuture.join();
    }

    @Test
    public void testNotArchivingSuspendedJobToHistoryServer() throws Exception {
        AtomicBoolean isArchived = new AtomicBoolean(false);
        TestingDispatcher.Builder testingDispatcherBuilder = this.createTestingDispatcherBuilder().setHistoryServerArchivist(executionGraphInfo -> {
            isArchived.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(testingDispatcherBuilder, 0);
        this.suspendJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
        this.assertLocalCleanupTriggered(this.jobId);
        this.dispatcher.getJobTerminationFuture(this.jobId, Time.hours((long)1L)).join();
        Assert.assertFalse((boolean)isArchived.get());
    }

    private class FailingJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final Exception testException;

        public FailingJobManagerRunnerFactory(FlinkException testException) {
            this.testException = testException;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) throws Exception {
            throw this.testException;
        }
    }

    private static final class QueueJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final Queue<? extends JobManagerRunner> jobManagerRunners;

        private QueueJobManagerRunnerFactory(Queue<? extends JobManagerRunner> jobManagerRunners) {
            this.jobManagerRunners = jobManagerRunners;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) {
            return Optional.ofNullable(this.jobManagerRunners.poll()).orElseThrow(() -> new IllegalStateException("Cannot create more JobManagerRunners."));
        }
    }

    private static final class BlockingJobManagerRunnerFactory
    extends TestingJobMasterServiceLeadershipRunnerFactory {
        private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
        private TestingJobManagerRunner testingRunner;

        BlockingJobManagerRunnerFactory(ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
            this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
        }

        @Override
        public TestingJobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) throws Exception {
            this.jobManagerRunnerCreationLatch.run();
            this.testingRunner = super.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, (Collection)failureEnrichers, initializationTimestamp);
            TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder().setRequestJobSupplier(() -> CompletableFuture.completedFuture(new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph((JobID)jobGraph.getJobID(), (String)jobGraph.getName(), (JobStatus)JobStatus.RUNNING, null, null, (long)1337L)))).build();
            this.testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
            return this.testingRunner;
        }

        public void setJobStatus(JobStatus newStatus) {
            Preconditions.checkState((this.testingRunner != null ? 1 : 0) != 0, (Object)"JobManagerRunner must be created before this method is available");
            this.testingRunner.setJobStatus(newStatus);
        }
    }
}

