/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.minicluster;

import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.minicluster.JobExecutorService;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MiniCluster
implements JobExecutorService,
AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
    private final Object lock = new Object();
    private final MiniClusterConfiguration miniClusterConfiguration;
    private final Time rpcTimeout;
    private CompletableFuture<Void> terminationFuture;
    @GuardedBy(value="lock")
    private MetricRegistryImpl metricRegistry;
    @GuardedBy(value="lock")
    private RpcService commonRpcService;
    @GuardedBy(value="lock")
    private RpcService jobManagerRpcService;
    @GuardedBy(value="lock")
    private RpcService[] taskManagerRpcServices;
    @GuardedBy(value="lock")
    private RpcService resourceManagerRpcService;
    @GuardedBy(value="lock")
    private HighAvailabilityServices haServices;
    @GuardedBy(value="lock")
    private BlobServer blobServer;
    @GuardedBy(value="lock")
    private HeartbeatServices heartbeatServices;
    @GuardedBy(value="lock")
    private BlobCacheService blobCacheService;
    @GuardedBy(value="lock")
    private ResourceManagerRunner resourceManagerRunner;
    private volatile TaskExecutor[] taskManagers;
    @GuardedBy(value="lock")
    private DispatcherRestEndpoint dispatcherRestEndpoint;
    @GuardedBy(value="lock")
    private URI restAddressURI;
    @GuardedBy(value="lock")
    private LeaderRetrievalService resourceManagerLeaderRetriever;
    @GuardedBy(value="lock")
    private LeaderRetrievalService dispatcherLeaderRetriever;
    @GuardedBy(value="lock")
    private StandaloneDispatcher dispatcher;
    @GuardedBy(value="lock")
    private JobManagerMetricGroup jobManagerMetricGroup;
    @GuardedBy(value="lock")
    private RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever;
    private volatile boolean running;

    public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
        this.miniClusterConfiguration = (MiniClusterConfiguration)Preconditions.checkNotNull((Object)miniClusterConfiguration, (String)"config may not be null");
        this.rpcTimeout = Time.seconds((long)10L);
        this.terminationFuture = CompletableFuture.completedFuture(null);
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public URI getRestAddress() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.running, (Object)"MiniCluster is not yet running.");
            return this.restAddressURI;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public HighAvailabilityServices getHighAvailabilityServices() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.running, (Object)"MiniCluster is not yet running.");
            return this.haServices;
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.running ? 1 : 0) != 0, (Object)"FlinkMiniCluster is already running");
            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", (Object)this.miniClusterConfiguration);
            UnmodifiableConfiguration configuration = this.miniClusterConfiguration.getConfiguration();
            Time rpcTimeout = this.miniClusterConfiguration.getRpcTimeout();
            int numTaskManagers = this.miniClusterConfiguration.getNumTaskManagers();
            boolean useSingleRpcService = this.miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
            try {
                RpcService resourceManagerRpcService;
                RpcService jobManagerRpcService;
                this.initializeIOFormatClasses((Configuration)configuration);
                LOG.info("Starting Metrics Registry");
                this.metricRegistry = this.createMetricRegistry((Configuration)configuration);
                this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(this.metricRegistry, "localhost");
                RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
                LOG.info("Starting RPC Service(s)");
                this.commonRpcService = this.createRpcService((Configuration)configuration, rpcTimeout, false, null);
                ActorSystem actorSystem = ((AkkaRpcService)this.commonRpcService).getActorSystem();
                this.metricRegistry.startQueryService(actorSystem, null);
                if (useSingleRpcService) {
                    for (int i = 0; i < numTaskManagers; ++i) {
                        taskManagerRpcServices[i] = this.commonRpcService;
                    }
                    jobManagerRpcService = this.commonRpcService;
                    resourceManagerRpcService = this.commonRpcService;
                    this.resourceManagerRpcService = null;
                    this.jobManagerRpcService = null;
                    this.taskManagerRpcServices = null;
                } else {
                    String jobManagerBindAddress = this.miniClusterConfiguration.getJobManagerBindAddress();
                    String taskManagerBindAddress = this.miniClusterConfiguration.getTaskManagerBindAddress();
                    String resourceManagerBindAddress = this.miniClusterConfiguration.getResourceManagerBindAddress();
                    jobManagerRpcService = this.createRpcService((Configuration)configuration, rpcTimeout, true, jobManagerBindAddress);
                    resourceManagerRpcService = this.createRpcService((Configuration)configuration, rpcTimeout, true, resourceManagerBindAddress);
                    for (int i = 0; i < numTaskManagers; ++i) {
                        taskManagerRpcServices[i] = this.createRpcService((Configuration)configuration, rpcTimeout, true, taskManagerBindAddress);
                    }
                    this.jobManagerRpcService = jobManagerRpcService;
                    this.taskManagerRpcServices = taskManagerRpcServices;
                    this.resourceManagerRpcService = resourceManagerRpcService;
                }
                LOG.info("Starting high-availability services");
                this.haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices((Configuration)configuration, this.commonRpcService.getExecutor());
                this.blobServer = new BlobServer((Configuration)configuration, this.haServices.createBlobStore());
                this.blobServer.start();
                this.heartbeatServices = HeartbeatServices.fromConfiguration((Configuration)configuration);
                LOG.info("Starting ResourceManger");
                this.resourceManagerRunner = this.startResourceManager((Configuration)configuration, this.haServices, this.heartbeatServices, this.metricRegistry, resourceManagerRpcService, new ClusterInformation("localhost", this.blobServer.getPort()), this.jobManagerMetricGroup);
                this.blobCacheService = new BlobCacheService((Configuration)configuration, this.haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), this.blobServer.getPort()));
                LOG.info("Starting {} TaskManger(s)", (Object)numTaskManagers);
                this.taskManagers = this.startTaskManagers((Configuration)configuration, this.haServices, this.heartbeatServices, this.metricRegistry, this.blobCacheService, numTaskManagers, taskManagerRpcServices);
                LOG.info("Starting dispatcher rest endpoint.");
                this.dispatcherGatewayRetriever = new RpcGatewayRetriever<DispatcherId, DispatcherGateway>(jobManagerRpcService, DispatcherGateway.class, DispatcherId::fromUuid, 20, Time.milliseconds((long)20L));
                RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway>(jobManagerRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 20, Time.milliseconds((long)20L));
                this.dispatcherRestEndpoint = new DispatcherRestEndpoint(RestServerEndpointConfiguration.fromConfiguration((Configuration)configuration), (GatewayRetriever<DispatcherGateway>)this.dispatcherGatewayRetriever, (Configuration)configuration, RestHandlerConfiguration.fromConfiguration((Configuration)configuration), (GatewayRetriever<ResourceManagerGateway>)resourceManagerGatewayRetriever, this.blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService(configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), (MetricQueryServiceRetriever)new AkkaQueryServiceRetriever(actorSystem, Time.milliseconds((long)configuration.getLong(WebOptions.TIMEOUT))), this.haServices.getWebMonitorLeaderElectionService(), (FatalErrorHandler)new ShutDownFatalErrorHandler());
                this.dispatcherRestEndpoint.start();
                this.restAddressURI = new URI(this.dispatcherRestEndpoint.getRestBaseUrl());
                LOG.info("Starting job dispatcher(s) for JobManger");
                this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(this.metricRegistry, "localhost");
                HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist((Configuration)configuration, this.dispatcherRestEndpoint);
                this.dispatcher = new StandaloneDispatcher(jobManagerRpcService, "dispatcher" + UUID.randomUUID(), (Configuration)configuration, this.haServices, this.resourceManagerRunner.getResourceManageGateway(), this.blobServer, this.heartbeatServices, this.jobManagerMetricGroup, this.metricRegistry.getMetricQueryServicePath(), new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), this.dispatcherRestEndpoint.getRestBaseUrl(), historyServerArchivist);
                this.dispatcher.start();
                this.resourceManagerLeaderRetriever = this.haServices.getResourceManagerLeaderRetriever();
                this.dispatcherLeaderRetriever = this.haServices.getDispatcherLeaderRetriever();
                this.resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
                this.dispatcherLeaderRetriever.start(this.dispatcherGatewayRetriever);
            }
            catch (Exception e) {
                try {
                    this.close();
                }
                catch (Exception ee) {
                    e.addSuppressed(ee);
                }
                throw e;
            }
            this.terminationFuture = new CompletableFuture();
            this.running = true;
            LOG.info("Flink Mini Cluster started successfully");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                LOG.info("Shutting down Flink Mini Cluster");
                try {
                    int numComponents = 2 + this.miniClusterConfiguration.getNumTaskManagers();
                    ArrayList<CompletableFuture<Void>> componentTerminationFutures = new ArrayList<CompletableFuture<Void>>(numComponents);
                    if (this.taskManagers != null) {
                        for (TaskExecutor tm : this.taskManagers) {
                            if (tm == null) continue;
                            tm.shutDown();
                            componentTerminationFutures.add(tm.getTerminationFuture());
                        }
                        this.taskManagers = null;
                    }
                    componentTerminationFutures.add(this.shutDownDispatcher());
                    if (this.resourceManagerRunner != null) {
                        componentTerminationFutures.add(this.resourceManagerRunner.closeAsync());
                        this.resourceManagerRunner = null;
                    }
                    FutureUtils.ConjunctFuture<Void> componentsTerminationFuture = FutureUtils.completeAll(componentTerminationFutures);
                    CompletableFuture<Void> metricRegistryTerminationFuture = FutureUtils.runAfterwards(componentsTerminationFuture, () -> {
                        Object object = this.lock;
                        synchronized (object) {
                            if (this.jobManagerMetricGroup != null) {
                                this.jobManagerMetricGroup.close();
                                this.jobManagerMetricGroup = null;
                            }
                            if (this.metricRegistry != null) {
                                this.metricRegistry.shutdown();
                                this.metricRegistry = null;
                            }
                        }
                    });
                    CompletionStage rpcServicesTerminationFuture = metricRegistryTerminationFuture.thenCompose(ignored -> this.terminateRpcServices());
                    CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards(rpcServicesTerminationFuture, this::terminateMiniClusterServices);
                    remainingServicesTerminationFuture.whenComplete((ignored, throwable) -> {
                        if (throwable != null) {
                            this.terminationFuture.completeExceptionally(ExceptionUtils.stripCompletionException((Throwable)throwable));
                        } else {
                            this.terminationFuture.complete(null);
                        }
                    });
                }
                finally {
                    this.running = false;
                }
            }
            return this.terminationFuture;
        }
    }

    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
        try {
            return this.getDispatcherGateway().requestMultipleJobDetails(this.rpcTimeout).thenApply(jobs -> jobs.getJobs().stream().map(details -> new JobStatusMessage(details.getJobId(), details.getJobName(), details.getStatus(), details.getStartTime())).collect(Collectors.toList()));
        }
        catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException("Could not retrieve job list.", (Throwable)e));
        }
    }

    public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
        try {
            return this.getDispatcherGateway().requestJobStatus(jobId, this.rpcTimeout);
        }
        catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not retrieve job status for job %s.", jobId), (Throwable)e));
        }
    }

    public CompletableFuture<Acknowledge> cancelJob(JobID jobId) {
        try {
            return this.getDispatcherGateway().cancelJob(jobId, this.rpcTimeout);
        }
        catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not cancel job %s.", jobId), (Throwable)e));
        }
    }

    public CompletableFuture<Acknowledge> stopJob(JobID jobId) {
        try {
            return this.getDispatcherGateway().stopJob(jobId, this.rpcTimeout);
        }
        catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not stop job %s.", jobId), (Throwable)e));
        }
    }

    public CompletableFuture<String> triggerSavepoint(JobID jobId, String targetDirectory, boolean cancelJob) {
        try {
            return this.getDispatcherGateway().triggerSavepoint(jobId, targetDirectory, cancelJob, this.rpcTimeout);
        }
        catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not trigger savepoint for job %s.", jobId), (Throwable)e));
        }
    }

    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
        try {
            return this.getDispatcherGateway().disposeSavepoint(savepointPath, this.rpcTimeout);
        }
        catch (InterruptedException | LeaderRetrievalException e) {
            ExceptionUtils.checkInterrupted((Throwable)e);
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not dispose savepoint %s.", savepointPath), (Throwable)e));
        }
    }

    public CompletableFuture<? extends AccessExecutionGraph> getExecutionGraph(JobID jobId) {
        try {
            return this.getDispatcherGateway().requestJob(jobId, this.rpcTimeout);
        }
        catch (InterruptedException | LeaderRetrievalException e) {
            return FutureUtils.completedExceptionally(new FlinkException(String.format("Could not retrieve job job %s.", jobId), (Throwable)e));
        }
    }

    public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull((Object)job, (String)"job is null");
        CompletableFuture<JobSubmissionResult> submissionFuture = this.submitJob(job);
        try {
            submissionFuture.get();
        }
        catch (ExecutionException e) {
            throw new JobExecutionException(job.getJobID(), ExceptionUtils.stripExecutionException((Throwable)e));
        }
    }

    @Override
    public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        JobResult jobResult;
        Preconditions.checkNotNull((Object)job, (String)"job is null");
        CompletableFuture<JobSubmissionResult> submissionFuture = this.submitJob(job);
        CompletionStage jobResultFuture = submissionFuture.thenCompose(ignored -> this.requestJobResult(job.getJobID()));
        try {
            jobResult = (JobResult)((CompletableFuture)jobResultFuture).get();
        }
        catch (ExecutionException e) {
            throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException((Throwable)e));
        }
        try {
            return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
        }
        catch (IOException | ClassNotFoundException e) {
            throw new JobExecutionException(job.getJobID(), e);
        }
    }

    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        DispatcherGateway dispatcherGateway;
        try {
            dispatcherGateway = this.getDispatcherGateway();
        }
        catch (InterruptedException | LeaderRetrievalException e) {
            ExceptionUtils.checkInterrupted((Throwable)e);
            return FutureUtils.completedExceptionally((Throwable)e);
        }
        jobGraph.setAllowQueuedScheduling(true);
        CompletableFuture<InetSocketAddress> blobServerAddressFuture = this.createBlobServerAddress(dispatcherGateway);
        CompletableFuture<Void> jarUploadFuture = this.uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
        CompletionStage acknowledgeCompletableFuture = jarUploadFuture.thenCompose(ack -> dispatcherGateway.submitJob(jobGraph, this.rpcTimeout));
        return ((CompletableFuture)acknowledgeCompletableFuture).thenApply(ignored -> new JobSubmissionResult(jobGraph.getJobID()));
    }

    public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
        DispatcherGateway dispatcherGateway;
        try {
            dispatcherGateway = this.getDispatcherGateway();
        }
        catch (InterruptedException | LeaderRetrievalException e) {
            ExceptionUtils.checkInterrupted((Throwable)e);
            return FutureUtils.completedExceptionally((Throwable)e);
        }
        return dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT);
    }

    private DispatcherGateway getDispatcherGateway() throws LeaderRetrievalException, InterruptedException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((boolean)this.running, (Object)"MiniCluster is not yet running.");
            try {
                return (DispatcherGateway)this.dispatcherGatewayRetriever.getFuture().get();
            }
            catch (ExecutionException e) {
                throw new LeaderRetrievalException("Could not retrieve the leading dispatcher.", ExceptionUtils.stripExecutionException((Throwable)e));
            }
        }
    }

    private CompletableFuture<Void> uploadAndSetJobFiles(CompletableFuture<InetSocketAddress> blobServerAddressFuture, JobGraph job) {
        return blobServerAddressFuture.thenAccept(blobServerAddress -> {
            try {
                ClientUtils.extractAndUploadJobGraphFiles(job, (SupplierWithException<BlobClient, IOException>)((SupplierWithException)() -> new BlobClient((InetSocketAddress)blobServerAddress, (Configuration)this.miniClusterConfiguration.getConfiguration())));
            }
            catch (FlinkException e) {
                throw new CompletionException(e);
            }
        });
    }

    private CompletableFuture<InetSocketAddress> createBlobServerAddress(DispatcherGateway currentDispatcherGateway) {
        return currentDispatcherGateway.getBlobServerPort(this.rpcTimeout).thenApply(blobServerPort -> new InetSocketAddress(currentDispatcherGateway.getHostname(), (int)blobServerPort));
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration config) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
    }

    protected RpcService createRpcService(Configuration configuration, Time askTimeout, boolean remoteEnabled, String bindAddress) {
        Config akkaConfig = remoteEnabled ? AkkaUtils.getAkkaConfig(configuration, bindAddress, 0) : AkkaUtils.getAkkaConfig(configuration);
        Config effectiveAkkaConfig = AkkaUtils.testDispatcherConfig().withFallback((ConfigMergeable)akkaConfig);
        ActorSystem actorSystem = AkkaUtils.createActorSystem(effectiveAkkaConfig);
        return new AkkaRpcService(actorSystem, askTimeout);
    }

    protected ResourceManagerRunner startResourceManager(Configuration configuration, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, RpcService resourceManagerRpcService, ClusterInformation clusterInformation, JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
        ResourceManagerRunner resourceManagerRunner = new ResourceManagerRunner(ResourceID.generate(), "resourcemanager_" + UUID.randomUUID(), configuration, resourceManagerRpcService, haServices, heartbeatServices, metricRegistry, clusterInformation, jobManagerMetricGroup);
        resourceManagerRunner.start();
        return resourceManagerRunner;
    }

    protected TaskExecutor[] startTaskManagers(Configuration configuration, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, BlobCacheService blobCacheService, int numTaskManagers, RpcService[] taskManagerRpcServices) throws Exception {
        TaskExecutor[] taskExecutors = new TaskExecutor[numTaskManagers];
        boolean localCommunication = numTaskManagers == 1;
        for (int i = 0; i < numTaskManagers; ++i) {
            taskExecutors[i] = TaskManagerRunner.startTaskManager(configuration, new ResourceID(UUID.randomUUID().toString()), taskManagerRpcServices[i], haServices, heartbeatServices, metricRegistry, blobCacheService, localCommunication, new TerminatingFatalErrorHandler(i));
            taskExecutors[i].start();
        }
        return taskExecutors;
    }

    @GuardedBy(value="lock")
    private CompletableFuture<Void> shutDownDispatcher() {
        ArrayList<CompletableFuture<Void>> terminationFutures = new ArrayList<CompletableFuture<Void>>(2);
        if (this.dispatcher != null) {
            this.dispatcher.shutDown();
            terminationFutures.add(this.dispatcher.getTerminationFuture());
            this.dispatcher = null;
        }
        if (this.dispatcherRestEndpoint != null) {
            terminationFutures.add(this.dispatcherRestEndpoint.closeAsync());
            this.dispatcherRestEndpoint = null;
        }
        FutureUtils.ConjunctFuture<Void> dispatcherTerminationFuture = FutureUtils.completeAll(terminationFutures);
        return FutureUtils.runAfterwards(dispatcherTerminationFuture, () -> {
            Exception exception = null;
            Object object = this.lock;
            synchronized (object) {
                if (this.resourceManagerLeaderRetriever != null) {
                    try {
                        this.resourceManagerLeaderRetriever.stop();
                    }
                    catch (Exception e) {
                        exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
                    }
                    this.resourceManagerLeaderRetriever = null;
                }
                if (this.dispatcherLeaderRetriever != null) {
                    try {
                        this.dispatcherLeaderRetriever.stop();
                    }
                    catch (Exception e) {
                        exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                    }
                    this.dispatcherLeaderRetriever = null;
                }
            }
            if (exception != null) {
                throw exception;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void terminateMiniClusterServices() throws Exception {
        Exception exception = null;
        Object object = this.lock;
        synchronized (object) {
            if (this.blobCacheService != null) {
                try {
                    this.blobCacheService.close();
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
                }
                this.blobCacheService = null;
            }
            if (this.blobServer != null) {
                try {
                    this.blobServer.close();
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                }
                this.blobServer = null;
            }
            if (this.haServices != null) {
                try {
                    this.haServices.closeAndCleanupAllData();
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                }
                this.haServices = null;
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    private CompletionStage<Void> terminateRpcServices() {
        int numRpcServices = this.miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED ? 1 : 3 + this.miniClusterConfiguration.getNumTaskManagers();
        ArrayList<CompletableFuture<Void>> rpcTerminationFutures = new ArrayList<CompletableFuture<Void>>(numRpcServices);
        Object object = this.lock;
        synchronized (object) {
            rpcTerminationFutures.add(this.commonRpcService.stopService());
            if (this.miniClusterConfiguration.getRpcServiceSharing() != RpcServiceSharing.SHARED) {
                rpcTerminationFutures.add(this.jobManagerRpcService.stopService());
                rpcTerminationFutures.add(this.resourceManagerRpcService.stopService());
                for (RpcService taskManagerRpcService : this.taskManagerRpcServices) {
                    rpcTerminationFutures.add(taskManagerRpcService.stopService());
                }
            }
            this.commonRpcService = null;
            this.jobManagerRpcService = null;
            this.taskManagerRpcServices = null;
            this.resourceManagerRpcService = null;
        }
        return FutureUtils.completeAll(rpcTerminationFutures);
    }

    private void initializeIOFormatClasses(Configuration configuration) {
        FileOutputFormat.initDefaultsFromConfiguration((Configuration)configuration);
    }

    private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
        if (rpcService != null) {
            try {
                rpcService.stopService().get();
            }
            catch (Throwable t) {
                return ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)priorException);
            }
        }
        return priorException;
    }

    private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorException) {
        if (rpcServices != null) {
            Throwable exception = priorException;
            for (RpcService service : rpcServices) {
                try {
                    if (service == null) continue;
                    service.stopService().get();
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
        }
        return priorException;
    }

    private class ShutDownFatalErrorHandler
    implements FatalErrorHandler {
        private ShutDownFatalErrorHandler() {
        }

        @Override
        public void onFatalError(Throwable exception) {
            LOG.warn("Error in MiniCluster. Shutting the MiniCluster down.", exception);
            MiniCluster.this.closeAsync();
        }
    }

    private class TerminatingFatalErrorHandler
    implements FatalErrorHandler {
        private final int index;

        private TerminatingFatalErrorHandler(int index) {
            this.index = index;
        }

        @Override
        public void onFatalError(Throwable exception) {
            if (MiniCluster.this.running) {
                LOG.error("TaskManager #{} failed.", (Object)this.index, (Object)exception);
                TaskExecutor[] currentTaskManagers = MiniCluster.this.taskManagers;
                if (currentTaskManagers != null) {
                    currentTaskManagers[this.index].shutDown();
                }
            }
        }
    }
}

