package org.apache.beam.runners.direct.portable;

import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Optional;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.Clock;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.runners.direct.portable.CommittedResult;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/portable/EvaluationContext.class */
public class EvaluationContext {
    private final ExecutableGraph<PipelineNode.PTransformNode, ? super PipelineNode.PCollectionNode> graph;
    private final Clock clock;
    private final BundleFactory bundleFactory;
    private final WatermarkManager<PipelineNode.PTransformNode, ? super PipelineNode.PCollectionNode> watermarkManager;
    private final Set<PipelineNode.PCollectionNode> keyedPValues;
    private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals> applicationStateInternals = new ConcurrentHashMap();
    private final DirectMetrics metrics = new DirectMetrics();
    private final WatermarkCallbackExecutor callbackExecutor = WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());

    public static EvaluationContext create(Clock clock, BundleFactory bundleFactory, ExecutableGraph<PipelineNode.PTransformNode, ? super PipelineNode.PCollectionNode> executableGraph, Set<PipelineNode.PCollectionNode> set) {
        return new EvaluationContext(clock, bundleFactory, executableGraph, set);
    }

    private EvaluationContext(Clock clock, BundleFactory bundleFactory, ExecutableGraph<PipelineNode.PTransformNode, ? super PipelineNode.PCollectionNode> executableGraph, Set<PipelineNode.PCollectionNode> set) {
        this.clock = clock;
        this.bundleFactory = (BundleFactory) Preconditions.checkNotNull(bundleFactory);
        this.graph = (ExecutableGraph) Preconditions.checkNotNull(executableGraph);
        this.keyedPValues = set;
        this.watermarkManager = WatermarkManager.create(clock, executableGraph);
    }

    public void initialize(Map<PipelineNode.PTransformNode, ? extends Iterable<CommittedBundle<?>>> map) {
        this.watermarkManager.initialize(map);
    }

    public CommittedResult<PipelineNode.PTransformNode> handleResult(CommittedBundle<?> committedBundle, Iterable<TimerInternals.TimerData> iterable, TransformResult<?> transformResult) {
        Iterable<? extends CommittedBundle<?>> commitBundles = commitBundles(transformResult.getOutputBundles());
        this.metrics.commitLogical(committedBundle, transformResult.getLogicalMetricUpdates());
        EnumSet copyOf = EnumSet.copyOf((Collection) transformResult.getOutputTypes());
        if (Iterables.isEmpty(commitBundles)) {
            copyOf.remove(CommittedResult.OutputType.BUNDLE);
        } else {
            copyOf.add(CommittedResult.OutputType.BUNDLE);
        }
        CommittedResult<PipelineNode.PTransformNode> create = CommittedResult.create(transformResult, getUnprocessedInput(committedBundle, transformResult), commitBundles, copyOf);
        CopyOnAccessInMemoryStateInternals state = transformResult.getState();
        if (state != null) {
            CopyOnAccessInMemoryStateInternals commit = state.commit();
            StepAndKey of = StepAndKey.of(transformResult.getTransform(), committedBundle.getKey());
            if (commit.isEmpty()) {
                this.applicationStateInternals.remove(of);
            } else {
                this.applicationStateInternals.put(of, commit);
            }
        }
        this.watermarkManager.updateWatermarks(committedBundle, transformResult.getTimerUpdate().withCompletedTimers(iterable), create.getExecutable(), create.getUnprocessedInputs().orNull(), create.getOutputs(), transformResult.getWatermarkHold());
        return create;
    }

    private Optional<? extends CommittedBundle<?>> getUnprocessedInput(CommittedBundle<?> committedBundle, TransformResult<?> transformResult) {
        return (committedBundle == null || Iterables.isEmpty(transformResult.getUnprocessedElements())) ? Optional.absent() : Optional.of(committedBundle.withElements(transformResult.getUnprocessedElements()));
    }

    private Iterable<? extends CommittedBundle<?>> commitBundles(Iterable<? extends UncommittedBundle<?>> iterable) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (UncommittedBundle<?> uncommittedBundle : iterable) {
            CommittedBundle<?> commit = uncommittedBundle.commit(this.watermarkManager.getWatermarks(this.graph.getProducer(uncommittedBundle.getPCollection())).getSynchronizedProcessingOutputTime());
            if (!Iterables.isEmpty(commit.getElements())) {
                builder.add((ImmutableList.Builder) commit);
            }
        }
        return builder.build();
    }

    private void fireAllAvailableCallbacks() {
        Iterator<PipelineNode.PTransformNode> it = this.graph.getExecutables().iterator();
        while (it.hasNext()) {
            fireAvailableCallbacks(it.next());
        }
    }

    private void fireAvailableCallbacks(PipelineNode.PTransformNode pTransformNode) {
        this.callbackExecutor.fireForWatermark(pTransformNode, this.watermarkManager.getWatermarks(pTransformNode).getOutputWatermark());
    }

    public <T> UncommittedBundle<T> createRootBundle() {
        return this.bundleFactory.createRootBundle();
    }

    public <T> UncommittedBundle<T> createBundle(PipelineNode.PCollectionNode pCollectionNode) {
        return this.bundleFactory.createBundle(pCollectionNode);
    }

    public <K, T> UncommittedBundle<T> createKeyedBundle(StructuralKey<K> structuralKey, PipelineNode.PCollectionNode pCollectionNode) {
        return this.bundleFactory.createKeyedBundle(structuralKey, pCollectionNode);
    }

    public <T> boolean isKeyed(PipelineNode.PCollectionNode pCollectionNode) {
        return this.keyedPValues.contains(pCollectionNode);
    }

    public void scheduleAfterOutputWouldBeProduced(PipelineNode.PCollectionNode pCollectionNode, BoundedWindow boundedWindow, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        PipelineNode.PTransformNode producer = this.graph.getProducer(pCollectionNode);
        this.callbackExecutor.callOnWindowExpiration(producer, boundedWindow, windowingStrategy, runnable);
        fireAvailableCallbacks(producer);
    }

    public <K> StepStateAndTimers<K> getStateAndTimers(PipelineNode.PTransformNode pTransformNode, StructuralKey<K> structuralKey) {
        return new DirectStateAndTimers(structuralKey, this.applicationStateInternals.get(StepAndKey.of(pTransformNode, structuralKey)), this.clock, this.watermarkManager.getWatermarks(pTransformNode));
    }

    Collection<PipelineNode.PTransformNode> getSteps() {
        return this.graph.getExecutables();
    }

    public DirectMetrics getMetrics() {
        return this.metrics;
    }

    @VisibleForTesting
    void forceRefresh() {
        this.watermarkManager.refreshAll();
        fireAllAvailableCallbacks();
    }

    public Collection<WatermarkManager.FiredTimers<PipelineNode.PTransformNode>> extractFiredTimers() {
        forceRefresh();
        return this.watermarkManager.extractFiredTimers();
    }

    public boolean isDone(PipelineNode.PTransformNode pTransformNode) {
        return !this.watermarkManager.getWatermarks(pTransformNode).getOutputWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    public boolean isDone() {
        Iterator<PipelineNode.PTransformNode> it = this.graph.getExecutables().iterator();
        while (it.hasNext()) {
            if (!isDone(it.next())) {
                return false;
            }
        }
        return true;
    }

    public Instant now() {
        return this.clock.now();
    }

    Clock getClock() {
        return this.clock;
    }
}
