package org.apache.beam.runners.direct.portable.job;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.UnmodifiableIterator;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.repackaged.beam_runners_direct_java.io.grpc.Status;
import org.apache.beam.repackaged.beam_runners_direct_java.io.grpc.StatusRuntimeException;
import org.apache.beam.repackaged.beam_runners_direct_java.io.grpc.stub.StreamObserver;
import org.apache.beam.repackaged.beam_runners_direct_java.model.jobmanagement.v1.JobApi;
import org.apache.beam.repackaged.beam_runners_direct_java.model.jobmanagement.v1.JobServiceGrpc;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.direct.portable.ReferenceRunner;
import org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactStagerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.class */
public class ReferenceRunnerJobService extends JobServiceGrpc.JobServiceImplBase implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(ReferenceRunnerJobService.class);
    private final ServerFactory serverFactory;
    private final Callable<Path> stagingPathCallable;
    private final ConcurrentMap<String, PreparingJob> unpreparedJobs = new ConcurrentHashMap();
    private final ConcurrentMap<String, ReferenceRunner> runningJobs = new ConcurrentHashMap();
    private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("reference-runner-pipeline-%s").build());

    public static ReferenceRunnerJobService create(ServerFactory serverFactory) {
        return new ReferenceRunnerJobService(serverFactory, () -> {
            return Files.createTempDirectory("reference-runner-staging", new FileAttribute[0]);
        });
    }

    private ReferenceRunnerJobService(ServerFactory serverFactory, Callable<Path> callable) {
        this.serverFactory = serverFactory;
        this.stagingPathCallable = callable;
    }

    public ReferenceRunnerJobService withStagingPathSupplier(Callable<Path> callable) {
        return new ReferenceRunnerJobService(this.serverFactory, callable);
    }

    public void prepare(JobApi.PrepareJobRequest prepareJobRequest, StreamObserver<JobApi.PrepareJobResponse> streamObserver) {
        try {
            LOG.trace("{} {}", JobApi.PrepareJobResponse.class.getSimpleName(), prepareJobRequest);
            String str = prepareJobRequest.getJobName() + ThreadLocalRandom.current().nextInt();
            Path call = this.stagingPathCallable.call();
            GrpcFnServer<LocalFileSystemArtifactStagerService> createArtifactStagingService = createArtifactStagingService(call);
            Preconditions.checkArgument(this.unpreparedJobs.putIfAbsent(str, PreparingJob.builder().setArtifactStagingServer(createArtifactStagingService).setPipeline(prepareJobRequest.getPipeline()).setOptions(prepareJobRequest.getPipelineOptions()).setStagingLocation(call).build()) == null, "Unexpected existing job with preparation ID %s", str);
            streamObserver.onNext(JobApi.PrepareJobResponse.newBuilder().setPreparationId(str).setArtifactStagingEndpoint(createArtifactStagingService.getApiServiceDescriptor()).setStagingSessionToken(call.toFile().getAbsolutePath()).build());
            streamObserver.onCompleted();
        } catch (Exception e) {
            LOG.error("Could not prepare job with name {}", prepareJobRequest.getJobName(), e);
            streamObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    private GrpcFnServer<LocalFileSystemArtifactStagerService> createArtifactStagingService(Path path) throws Exception {
        return GrpcFnServer.allocatePortAndCreateFor(LocalFileSystemArtifactStagerService.forRootDirectory(path.toFile()), this.serverFactory);
    }

    public void run(JobApi.RunJobRequest runJobRequest, StreamObserver<JobApi.RunJobResponse> streamObserver) {
        try {
            LOG.trace("{} {}", JobApi.RunJobRequest.class.getSimpleName(), runJobRequest);
            String preparationId = runJobRequest.getPreparationId();
            PreparingJob preparingJob = this.unpreparedJobs.get(preparationId);
            if (preparingJob == null) {
                streamObserver.onError(Status.INVALID_ARGUMENT.withDescription(String.format("Unknown Preparation Id %s", preparationId)).asException());
                return;
            }
            try {
                preparingJob.close();
            } catch (Exception e) {
                streamObserver.onError(e);
            }
            ReferenceRunner forPipeline = ReferenceRunner.forPipeline(preparingJob.getPipeline(), preparingJob.getOptions(), preparingJob.getStagingLocation().toFile());
            String str = preparingJob + Integer.toString(ThreadLocalRandom.current().nextInt());
            streamObserver.onNext(JobApi.RunJobResponse.newBuilder().setJobId(str).build());
            streamObserver.onCompleted();
            this.runningJobs.put(str, forPipeline);
            this.executor.submit(() -> {
                forPipeline.execute();
                return null;
            });
        } catch (StatusRuntimeException e2) {
            streamObserver.onError(e2);
        } catch (Exception e3) {
            streamObserver.onError(Status.INTERNAL.withCause(e3).asException());
        }
    }

    public void getState(JobApi.GetJobStateRequest getJobStateRequest, StreamObserver<JobApi.GetJobStateResponse> streamObserver) {
        LOG.trace("{} {}", JobApi.GetJobStateRequest.class.getSimpleName(), getJobStateRequest);
        streamObserver.onError(Status.NOT_FOUND.withDescription(String.format("Unknown Job ID %s", getJobStateRequest.getJobId())).asException());
    }

    public void cancel(JobApi.CancelJobRequest cancelJobRequest, StreamObserver<JobApi.CancelJobResponse> streamObserver) {
        LOG.trace("{} {}", JobApi.CancelJobRequest.class.getSimpleName(), cancelJobRequest);
        streamObserver.onError(Status.NOT_FOUND.withDescription(String.format("Unknown Job ID %s", cancelJobRequest.getJobId())).asException());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService, java.lang.AutoCloseable
    public void close() throws Exception {
        UnmodifiableIterator it = ImmutableList.copyOf((Collection) this.unpreparedJobs.values()).iterator();
        while (it.hasNext()) {
            PreparingJob preparingJob = (PreparingJob) it.next();
            try {
                preparingJob.close();
            } catch (Exception e) {
                LOG.warn("Exception while closing preparing job {}", preparingJob);
            }
        }
    }
}
