package org.apache.beam.repackaged.beam_runners_direct_java.runners.core;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_core_java.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_runners_core_java.com.google.common.collect.Iterables;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/core/SimplePushbackSideInputDoFnRunner.class */
public class SimplePushbackSideInputDoFnRunner<InputT, OutputT> implements PushbackSideInputDoFnRunner<InputT, OutputT> {
    private final DoFnRunner<InputT, OutputT> underlying;
    private final Collection<PCollectionView<?>> views;
    private final ReadyCheckingSideInputReader sideInputReader;

    @Nullable
    private Set<BoundedWindow> notReadyWindows;

    public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create(DoFnRunner<InputT, OutputT> doFnRunner, Collection<PCollectionView<?>> collection, ReadyCheckingSideInputReader readyCheckingSideInputReader) {
        return new SimplePushbackSideInputDoFnRunner<>(doFnRunner, collection, readyCheckingSideInputReader);
    }

    private SimplePushbackSideInputDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner, Collection<PCollectionView<?>> collection, ReadyCheckingSideInputReader readyCheckingSideInputReader) {
        this.underlying = doFnRunner;
        this.views = collection;
        this.sideInputReader = readyCheckingSideInputReader;
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.PushbackSideInputDoFnRunner
    public DoFn<InputT, OutputT> getFn() {
        return this.underlying.getFn();
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.PushbackSideInputDoFnRunner
    public void startBundle() {
        this.notReadyWindows = new HashSet();
        this.underlying.startBundle();
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.PushbackSideInputDoFnRunner
    public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> windowedValue) {
        if (this.views.isEmpty()) {
            this.underlying.processElement(windowedValue);
            return Collections.emptyList();
        }
        ImmutableList.Builder builder = ImmutableList.builder();
        for (WindowedValue<InputT> windowedValue2 : windowedValue.explodeWindows()) {
            BoundedWindow boundedWindow = (BoundedWindow) Iterables.getOnlyElement(windowedValue2.getWindows());
            if (isReady(boundedWindow)) {
                this.underlying.processElement(windowedValue2);
            } else {
                this.notReadyWindows.add(boundedWindow);
                builder.add((ImmutableList.Builder) windowedValue2);
            }
        }
        return builder.build();
    }

    private boolean isReady(BoundedWindow boundedWindow) {
        if (this.notReadyWindows.contains(boundedWindow)) {
            return false;
        }
        for (PCollectionView<?> pCollectionView : this.views) {
            if (!this.sideInputReader.isReady(pCollectionView, pCollectionView.getWindowMappingFn().getSideInputWindow(boundedWindow))) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.PushbackSideInputDoFnRunner
    public void onTimer(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
        this.underlying.onTimer(str, boundedWindow, instant, timeDomain);
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.PushbackSideInputDoFnRunner
    public void finishBundle() {
        this.notReadyWindows = null;
        this.underlying.finishBundle();
    }
}
