/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateBackedIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.util.construction.RehydratedComponents;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataReadRunner<@UnknownKeyFor OutputT> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
    private final @UnknownKeyFor @NonNull @Initialized String pTransformId;
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutputT>> consumer;
    private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> processBundleInstructionIdSupplier;
    private final @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutputT>> coder;
    private final @UnknownKeyFor @NonNull @Initialized String dataChannelReadIndexShortId;
    private final @UnknownKeyFor @NonNull @Initialized Object splittingLock = new Object();
    private @UnknownKeyFor @NonNull @Initialized long index;
    private @UnknownKeyFor @NonNull @Initialized long stopIndex;

    BeamFnDataReadRunner(@UnknownKeyFor @NonNull @Initialized ShortIdMap shortIdMap, final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> cache, @UnknownKeyFor @NonNull @Initialized String pTransformId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.PTransform grpcReadNode, final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> processBundleInstructionIdSupplier, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.Coder> coders, final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized BundleProgressReporter> addBundleProgressReporter, @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutputT>> consumer) throws @UnknownKeyFor @NonNull @Initialized IOException {
        this.pTransformId = pTransformId;
        BeamFnApi.RemoteGrpcPort port = RemoteGrpcPortRead.fromPTransform((RunnerApi.PTransform)grpcReadNode).getPort();
        this.apiServiceDescriptor = port.getApiServiceDescriptor();
        this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
        this.consumer = consumer;
        RehydratedComponents components = RehydratedComponents.forComponents((RunnerApi.Components)RunnerApi.Components.newBuilder().putAllCoders(coders).build());
        this.coder = CoderTranslation.fromProto((RunnerApi.Coder)coders.get(port.getCoderId()), (RehydratedComponents)components, (CoderTranslation.TranslationContext)new StateBackedIterable.StateBackedIterableTranslationContext(){

            @Override
            public /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getCache() {
                return cache;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized BeamFnStateClient getStateClient() {
                return beamFnStateClient;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> getCurrentInstructionId() {
                return processBundleInstructionIdSupplier;
            }
        });
        this.dataChannelReadIndexShortId = shortIdMap.getOrCreateShortId(new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.DATA_CHANNEL_READ_INDEX).setType("beam:metrics:sum_int64:v1").setLabel("PTRANSFORM", pTransformId).build());
        addBundleProgressReporter.accept(new BundleProgressReporter(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void updateIntermediateMonitoringData(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ByteString> monitoringData) {
                Object object = BeamFnDataReadRunner.this.splittingLock;
                synchronized (object) {
                    monitoringData.put(BeamFnDataReadRunner.this.dataChannelReadIndexShortId, MonitoringInfoEncodings.encodeInt64Counter(BeamFnDataReadRunner.this.index));
                }
            }

            @Override
            public void updateFinalMonitoringData(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ByteString> monitoringData) {
                monitoringData.put(BeamFnDataReadRunner.this.dataChannelReadIndexShortId, MonitoringInfoEncodings.encodeInt64Counter(BeamFnDataReadRunner.this.index));
            }

            @Override
            public void reset() {
            }
        });
        this.clearSplitIndices();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void forwardElementToConsumer(@UnknownKeyFor @NonNull @Initialized WindowedValue<OutputT> element) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Object object = this.splittingLock;
        synchronized (object) {
            if (this.index == this.stopIndex - 1L) {
                return;
            }
            ++this.index;
        }
        this.consumer.accept(element);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void trySplit(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleSplitRequest request, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.ProcessBundleSplitResponse.Builder response) {
        BeamFnApi.ProcessBundleSplitRequest.DesiredSplit desiredSplit = (BeamFnApi.ProcessBundleSplitRequest.DesiredSplit)request.getDesiredSplitsMap().get(this.pTransformId);
        if (desiredSplit == null) {
            return;
        }
        long totalBufferSize = desiredSplit.getEstimatedInputElements();
        ArrayList<Long> allowedSplitPoints = new ArrayList<Long>(desiredSplit.getAllowedSplitPointsList());
        HandlesSplits splittingConsumer = null;
        if (this.consumer instanceof HandlesSplits) {
            splittingConsumer = (HandlesSplits)this.consumer;
        }
        Object object = this.splittingLock;
        synchronized (object) {
            long newStopIndex;
            double keepOfElementRemainder;
            if (this.index == this.stopIndex) {
                return;
            }
            if (!request.getInstructionId().equals(this.processBundleInstructionIdSupplier.get())) {
                return;
            }
            if (totalBufferSize < this.index + 1L) {
                totalBufferSize = this.index + 1L;
            } else if (totalBufferSize > this.stopIndex) {
                totalBufferSize = this.stopIndex;
            }
            double currentElementProgress = 1.0;
            if (this.index >= 0L) {
                currentElementProgress = splittingConsumer != null ? splittingConsumer.getProgress() : 0.5;
            }
            double remainder = (double)(totalBufferSize - this.index) - currentElementProgress;
            double keep = remainder * desiredSplit.getFractionOfRemainder();
            if (currentElementProgress < 1.0 && (keepOfElementRemainder = keep / (1.0 - currentElementProgress)) < 1.0 && this.isValidSplitPoint(allowedSplitPoints, this.index) && this.isValidSplitPoint(allowedSplitPoints, this.index + 1L)) {
                HandlesSplits.SplitResult splitResult;
                HandlesSplits.SplitResult splitResult2 = splitResult = splittingConsumer != null ? splittingConsumer.trySplit(keepOfElementRemainder) : null;
                if (splitResult != null) {
                    this.stopIndex = this.index + 1L;
                    response.addAllPrimaryRoots(splitResult.getPrimaryRoots()).addAllResidualRoots(splitResult.getResidualRoots()).addChannelSplitsBuilder().setLastPrimaryElement(this.index - 1L).setFirstResidualElement(this.stopIndex);
                    return;
                }
            }
            if (!this.isValidSplitPoint(allowedSplitPoints, newStopIndex = this.index + Math.max(1L, Math.round(currentElementProgress + keep)))) {
                Collections.sort(allowedSplitPoints);
                int closestSplitPointIndex = -(Collections.binarySearch(allowedSplitPoints, newStopIndex) + 1);
                if (closestSplitPointIndex == 0) {
                    newStopIndex = (Long)allowedSplitPoints.get(0);
                } else if (closestSplitPointIndex == allowedSplitPoints.size()) {
                    newStopIndex = (Long)allowedSplitPoints.get(closestSplitPointIndex - 1);
                } else {
                    long prevPoint = (Long)allowedSplitPoints.get(closestSplitPointIndex - 1);
                    long nextPoint = (Long)allowedSplitPoints.get(closestSplitPointIndex);
                    newStopIndex = this.index < prevPoint && newStopIndex - prevPoint < nextPoint - newStopIndex ? prevPoint : nextPoint;
                }
            }
            if (newStopIndex < this.stopIndex && newStopIndex > this.index) {
                this.stopIndex = newStopIndex;
                response.addChannelSplitsBuilder().setLastPrimaryElement(this.stopIndex - 1L).setFirstResidualElement(this.stopIndex);
                return;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void blockTillReadFinishes() throws @UnknownKeyFor @NonNull @Initialized Exception {
        LOG.debug("Waiting for process bundle instruction {} and transform {} to close.", (Object)this.processBundleInstructionIdSupplier.get(), (Object)this.pTransformId);
        Object object = this.splittingLock;
        synchronized (object) {
            ++this.index;
            this.stopIndex = this.index;
        }
    }

    public void reset() {
        Preconditions.checkArgument((this.processBundleInstructionIdSupplier.get() == null ? 1 : 0) != 0, (Object)"Cannot reset an active bundle processor.");
        this.clearSplitIndices();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearSplitIndices() {
        Object object = this.splittingLock;
        synchronized (object) {
            this.index = -1L;
            this.stopIndex = Long.MAX_VALUE;
        }
    }

    private @UnknownKeyFor @NonNull @Initialized boolean isValidSplitPoint(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Long> allowedSplitPoints, @UnknownKeyFor @NonNull @Initialized long index) {
        return allowedSplitPoints.isEmpty() || allowedSplitPoints.contains(index);
    }

    static class Factory<@UnknownKeyFor OutputT>
    implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> {
        Factory() {
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized BeamFnDataReadRunner<OutputT> createRunnerForPTransform(@UnknownKeyFor @NonNull @Initialized PTransformRunnerFactory.Context context) throws @UnknownKeyFor @NonNull @Initialized IOException {
            FnDataReceiver consumer = context.getPCollectionConsumer((String)Iterables.getOnlyElement(context.getPTransform().getOutputsMap().values()));
            BeamFnDataReadRunner runner = new BeamFnDataReadRunner(context.getShortIdMap(), context.getBundleCacheSupplier(), context.getPTransformId(), context.getPTransform(), context.getProcessBundleInstructionIdSupplier(), context.getCoders(), context.getBeamFnStateClient(), context::addBundleProgressReporter, consumer);
            context.addIncomingDataEndpoint(runner.apiServiceDescriptor, runner.coder, runner::forwardElementToConsumer);
            context.addFinishBundleFunction(runner::blockTillReadFinishes);
            context.addResetFunction(runner::reset);
            return runner;
        }
    }

    @AutoService(value={PTransformRunnerFactory.Registrar.class})
    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of((Object)"beam:runner:source:v1", new Factory());
        }
    }
}

