/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.beam.fn.harness.control.AutoValue_FinalizeBundleHandler_CallbackRegistration;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class FinalizeBundleHandler {
    private final @UnknownKeyFor @NonNull @Initialized ConcurrentMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized CallbackRegistration>> bundleFinalizationCallbacks = new ConcurrentHashMap<String, Collection<CallbackRegistration>>();
    private final @UnknownKeyFor @NonNull @Initialized PriorityQueue<@UnknownKeyFor @NonNull @Initialized TimestampedValue<@UnknownKeyFor @NonNull @Initialized String>> cleanUpQueue = new PriorityQueue<TimestampedValue>(11, Comparator.comparing(TimestampedValue::getTimestamp));
    private final @UnknownKeyFor @NonNull @Initialized Future<@UnknownKeyFor @Nullable @Initialized Void> cleanUpResult;

    public FinalizeBundleHandler(@UnknownKeyFor @NonNull @Initialized ExecutorService executorService) {
        this.cleanUpResult = executorService.submit(() -> {
            while (true) {
                PriorityQueue<TimestampedValue<String>> priorityQueue = this.cleanUpQueue;
                synchronized (priorityQueue) {
                    TimestampedValue<String> expiryTime = this.cleanUpQueue.peek();
                    while (expiryTime == null) {
                        this.cleanUpQueue.wait();
                        expiryTime = this.cleanUpQueue.peek();
                    }
                    Instant now = Instant.now();
                    while (expiryTime.getTimestamp().isAfter((ReadableInstant)now)) {
                        Duration timeDifference = new Duration((ReadableInstant)now, (ReadableInstant)expiryTime.getTimestamp());
                        this.cleanUpQueue.wait(timeDifference.getMillis());
                        expiryTime = this.cleanUpQueue.peek();
                        now = Instant.now();
                    }
                    this.bundleFinalizationCallbacks.remove(this.cleanUpQueue.poll().getValue());
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerCallbacks(@UnknownKeyFor @NonNull @Initialized String bundleId, @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized CallbackRegistration> callbacks) {
        if (callbacks.isEmpty()) {
            return;
        }
        Collection<CallbackRegistration> priorCallbacks = this.bundleFinalizationCallbacks.putIfAbsent(bundleId, callbacks);
        Preconditions.checkState((priorCallbacks == null ? 1 : 0) != 0, (String)"Expected to not have any past callbacks for bundle %s but found %s.", (Object)bundleId, priorCallbacks);
        long expiryTimeMillis = Long.MIN_VALUE;
        for (CallbackRegistration callback : callbacks) {
            expiryTimeMillis = Math.max(expiryTimeMillis, callback.getExpiryTime().getMillis());
        }
        PriorityQueue<TimestampedValue<String>> priorityQueue = this.cleanUpQueue;
        synchronized (priorityQueue) {
            this.cleanUpQueue.offer((TimestampedValue<String>)TimestampedValue.of((Object)bundleId, (Instant)new Instant(expiryTimeMillis)));
            this.cleanUpQueue.notify();
        }
    }

    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse.Builder finalizeBundle(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest request) throws @UnknownKeyFor @NonNull @Initialized Exception {
        String bundleId = request.getFinalizeBundle().getInstructionId();
        Collection callbacks = (Collection)this.bundleFinalizationCallbacks.remove(bundleId);
        if (callbacks == null) {
            return BeamFnApi.InstructionResponse.newBuilder().setFinalizeBundle(BeamFnApi.FinalizeBundleResponse.getDefaultInstance());
        }
        ArrayList<Exception> failures = new ArrayList<Exception>();
        for (CallbackRegistration callback : callbacks) {
            try {
                callback.getCallback().onBundleSuccess();
            }
            catch (Exception e) {
                failures.add(e);
            }
        }
        if (!failures.isEmpty()) {
            Exception e = new Exception(String.format("Failed to handle bundle finalization for bundle %s.", bundleId));
            for (Exception failure : failures) {
                e.addSuppressed(failure);
            }
            throw e;
        }
        return BeamFnApi.InstructionResponse.newBuilder().setFinalizeBundle(BeamFnApi.FinalizeBundleResponse.getDefaultInstance());
    }

    @AutoValue
    static abstract class CallbackRegistration {
        CallbackRegistration() {
        }

        public static @UnknownKeyFor @NonNull @Initialized CallbackRegistration create(@UnknownKeyFor @NonNull @Initialized Instant expiryTime, // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized DoFn.BundleFinalizer.Callback callback) {
            return new AutoValue_FinalizeBundleHandler_CallbackRegistration(expiryTime, callback);
        }

        public abstract @UnknownKeyFor @NonNull @Initialized Instant getExpiryTime();

        public abstract // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized DoFn.BundleFinalizer.Callback getCallback();
    }
}

