package org.apache.beam.runners.direct.portable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_direct_java.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.util.WindowedValue;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.class */
public class RemoteStageEvaluatorFactory implements TransformEvaluatorFactory {
    private final BundleFactory bundleFactory;
    private final JobBundleFactory jobFactory;

    /* loaded from: input_file:org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory$RemoteStageEvaluator.class */
    private class RemoteStageEvaluator<T> implements TransformEvaluator<T> {
        private final PipelineNode.PTransformNode transform;
        private final RemoteBundle<T> bundle;
        private final Collection<UncommittedBundle<?>> outputs;

        private RemoteStageEvaluator(PipelineNode.PTransformNode pTransformNode) throws Exception {
            this.transform = pTransformNode;
            ExecutableStage fromPayload = ExecutableStage.fromPayload(RunnerApi.ExecutableStagePayload.parseFrom(pTransformNode.getTransform().getSpec().getPayload()));
            this.outputs = new ArrayList();
            StageBundleFactory<T> forStage = RemoteStageEvaluatorFactory.this.jobFactory.forStage(fromPayload);
            BundleFactory bundleFactory = RemoteStageEvaluatorFactory.this.bundleFactory;
            RunnerApi.Components components = fromPayload.getComponents();
            Collection<UncommittedBundle<?>> collection = this.outputs;
            Objects.requireNonNull(collection);
            this.bundle = forStage.getBundle(BundleFactoryOutputRecieverFactory.create(bundleFactory, components, (v1) -> {
                r4.add(v1);
            }), StateRequestHandler.unsupported());
        }

        @Override // org.apache.beam.runners.direct.portable.TransformEvaluator
        public void processElement(WindowedValue<T> windowedValue) throws Exception {
            this.bundle.getInputReceiver().accept(windowedValue);
        }

        @Override // org.apache.beam.runners.direct.portable.TransformEvaluator
        public TransformResult<T> finishBundle() throws Exception {
            this.bundle.close();
            return StepTransformResult.withoutHold(this.transform).addOutput(this.outputs).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RemoteStageEvaluatorFactory(BundleFactory bundleFactory, JobBundleFactory jobBundleFactory) {
        this.bundleFactory = bundleFactory;
        this.jobFactory = jobBundleFactory;
    }

    @Override // org.apache.beam.runners.direct.portable.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(PipelineNode.PTransformNode pTransformNode, CommittedBundle<?> committedBundle) throws Exception {
        return new RemoteStageEvaluator(pTransformNode);
    }

    @Override // org.apache.beam.runners.direct.portable.TransformEvaluatorFactory
    public void cleanup() throws Exception {
        this.jobFactory.close();
    }
}
