package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.repackaged.beam_runners_direct_java.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.repackaged.beam_runners_direct_java.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;

@Deprecated
/* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.class */
public class SingleEnvironmentInstanceJobBundleFactory implements JobBundleFactory {
    private final EnvironmentFactory environmentFactory;
    private final GrpcFnServer<GrpcDataService> dataService;
    private final GrpcFnServer<GrpcStateService> stateService;
    private final ConcurrentMap<ExecutableStage, StageBundleFactory<?>> stageBundleFactories = new ConcurrentHashMap();
    private final ConcurrentMap<RunnerApi.Environment, RemoteEnvironment> environments = new ConcurrentHashMap();
    private final IdGenerator idGenerator = IdGenerators.incrementingLongs();

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory$BundleProcessorStageBundleFactory.class */
    private static class BundleProcessorStageBundleFactory<T> implements StageBundleFactory<T> {
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor;
        private final SdkHarnessClient.BundleProcessor<T> processor;

        private BundleProcessorStageBundleFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor, SdkHarnessClient.BundleProcessor<T> bundleProcessor) {
            this.descriptor = executableProcessBundleDescriptor;
            this.processor = bundleProcessor;
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.StageBundleFactory
        public RemoteBundle<T> getBundle(OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler) {
            HashMap hashMap = new HashMap();
            for (Map.Entry<BeamFnApi.Target, Coder<WindowedValue<?>>> entry : this.descriptor.getOutputTargetCoders().entrySet()) {
                hashMap.put(entry.getKey(), RemoteOutputReceiver.of(entry.getValue(), outputReceiverFactory.create((String) Iterables.getOnlyElement(this.descriptor.getProcessBundleDescriptor().getTransformsOrThrow(entry.getKey().getPrimitiveTransformReference()).getInputsMap().values()))));
            }
            return this.processor.newBundle(hashMap, stateRequestHandler);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
        }
    }

    public static JobBundleFactory create(EnvironmentFactory environmentFactory, GrpcFnServer<GrpcDataService> grpcFnServer, GrpcFnServer<GrpcStateService> grpcFnServer2) {
        return new SingleEnvironmentInstanceJobBundleFactory(environmentFactory, grpcFnServer, grpcFnServer2);
    }

    private SingleEnvironmentInstanceJobBundleFactory(EnvironmentFactory environmentFactory, GrpcFnServer<GrpcDataService> grpcFnServer, GrpcFnServer<GrpcStateService> grpcFnServer2) {
        this.environmentFactory = environmentFactory;
        this.dataService = grpcFnServer;
        this.stateService = grpcFnServer2;
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory
    public <T> StageBundleFactory<T> forStage(ExecutableStage executableStage) {
        return (StageBundleFactory) this.stageBundleFactories.computeIfAbsent(executableStage, this::createBundleFactory);
    }

    private <T> StageBundleFactory<T> createBundleFactory(ExecutableStage executableStage) {
        SdkHarnessClient withIdGenerator = SdkHarnessClient.usingFnApiClient(this.environments.computeIfAbsent(executableStage.getEnvironment(), environment -> {
            try {
                return this.environmentFactory.createEnvironment(environment);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).getInstructionRequestHandler(), this.dataService.getService()).withIdGenerator(this.idGenerator);
        try {
            ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage(this.idGenerator.getId(), executableStage, this.dataService.getApiServiceDescriptor());
            return new BundleProcessorStageBundleFactory(fromExecutableStage, withIdGenerator.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestination(), this.stateService.getService()));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Exception exc = null;
        Iterator<RemoteEnvironment> it = this.environments.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
                if (exc == null) {
                    exc = e;
                } else {
                    exc.addSuppressed(e);
                }
            }
        }
        if (exc != null) {
            throw exc;
        }
    }
}
