/*
 * Decompiled with CFR 0.152.
 */
package com.github.msemys.esjc;

import com.github.msemys.esjc.PersistentSubscriptionListener;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.RetryableResolvedEvent;
import com.github.msemys.esjc.Subscription;
import com.github.msemys.esjc.SubscriptionDropReason;
import com.github.msemys.esjc.SubscriptionListener;
import com.github.msemys.esjc.UserCredentials;
import com.github.msemys.esjc.subscription.PersistentSubscriptionChannel;
import com.github.msemys.esjc.subscription.PersistentSubscriptionNakEventAction;
import com.github.msemys.esjc.util.Preconditions;
import com.github.msemys.esjc.util.Subscriptions;
import com.github.msemys.esjc.util.Threads;
import com.github.msemys.esjc.util.concurrent.ResettableLatch;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PersistentSubscription
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(PersistentSubscription.class);
    private static final int MAX_EVENTS = 2000;
    private final String subscriptionId;
    private final String streamId;
    private final PersistentSubscriptionListener listener;
    private final UserCredentials userCredentials;
    private final boolean autoAck;
    private PersistentSubscriptionChannel subscription;
    private final Queue<RetryableResolvedEvent> queue = new ConcurrentLinkedQueue<RetryableResolvedEvent>();
    private final AtomicBoolean isProcessing = new AtomicBoolean();
    private final AtomicReference<Subscriptions.DropData> dropData = new AtomicReference();
    private final AtomicBoolean isDropped = new AtomicBoolean();
    private final ResettableLatch stopped = new ResettableLatch(true);
    private final int bufferSize;
    private final Executor executor;

    protected PersistentSubscription(String subscriptionId, String streamId, PersistentSubscriptionListener listener, UserCredentials userCredentials, int bufferSize, boolean autoAck, Executor executor) {
        this.subscriptionId = subscriptionId;
        this.streamId = streamId;
        this.listener = listener;
        this.userCredentials = userCredentials;
        this.bufferSize = bufferSize;
        this.autoAck = autoAck;
        this.executor = executor;
    }

    protected CompletableFuture<PersistentSubscription> start() {
        this.stopped.reset();
        SubscriptionListener<PersistentSubscriptionChannel, RetryableResolvedEvent> subscriptionListener = new SubscriptionListener<PersistentSubscriptionChannel, RetryableResolvedEvent>(){

            @Override
            public void onEvent(PersistentSubscriptionChannel subscription, RetryableResolvedEvent event) {
                PersistentSubscription.this.enqueue(event);
            }

            @Override
            public void onClose(PersistentSubscriptionChannel subscription, SubscriptionDropReason reason, Exception exception) {
                PersistentSubscription.this.enqueueSubscriptionDropNotification(reason, exception);
            }
        };
        return this.startSubscription(this.subscriptionId, this.streamId, this.bufferSize, subscriptionListener, this.userCredentials).thenApply(s -> {
            this.subscription = (PersistentSubscriptionChannel)s;
            return this;
        });
    }

    protected abstract CompletableFuture<Subscription> startSubscription(String var1, String var2, int var3, SubscriptionListener<PersistentSubscriptionChannel, RetryableResolvedEvent> var4, UserCredentials var5);

    public void acknowledge(UUID ... eventIds) {
        Preconditions.checkArgument(eventIds.length <= 2000, "events is limited to %d to ack at a time", 2000);
        this.subscription.notifyEventsProcessed(Arrays.asList(eventIds));
    }

    public void acknowledge(ResolvedEvent event) {
        this.acknowledge(event.originalEvent().eventId);
    }

    public void acknowledge(List<ResolvedEvent> events) {
        this.acknowledge((UUID[])events.stream().map(e -> e.originalEvent().eventId).toArray(UUID[]::new));
    }

    public void fail(ResolvedEvent event, PersistentSubscriptionNakEventAction action, String reason) {
        this.subscription.notifyEventsFailed(Collections.singletonList(event.originalEvent().eventId), action, reason);
    }

    public void fail(List<ResolvedEvent> events, PersistentSubscriptionNakEventAction action, String reason) {
        Preconditions.checkArgument(events.size() <= 2000, "events is limited to %d to ack at a time", 2000);
        this.subscription.notifyEventsFailed(events.stream().map(e -> e.originalEvent().eventId).collect(Collectors.toCollection(() -> new ArrayList(events.size()))), action, reason);
    }

    public void stop(Duration timeout) throws TimeoutException {
        logger.trace("Persistent subscription to {}: requesting stop...", (Object)this.streamId);
        this.enqueueSubscriptionDropNotification(SubscriptionDropReason.UserInitiated, null);
        if (!this.stopped.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException(String.format("Could not stop %s in time.", this.getClass().getSimpleName()));
        }
    }

    @Override
    public void close() throws TimeoutException {
        this.stop(Duration.ofSeconds(2L));
    }

    private void enqueueSubscriptionDropNotification(SubscriptionDropReason reason, Exception exception) {
        if (this.dropData.compareAndSet(null, new Subscriptions.DropData(reason, exception))) {
            this.enqueue(Subscriptions.DROP_PERSISTENT_SUBSCRIPTION_EVENT);
        }
    }

    private void enqueue(RetryableResolvedEvent event) {
        this.queue.offer(event);
        if (this.isProcessing.compareAndSet(false, true)) {
            this.executor.execute(this::processQueue);
        }
    }

    private void processQueue() {
        do {
            if (this.subscription == null) {
                Threads.sleepUninterruptibly(1L);
            } else {
                RetryableResolvedEvent event;
                while ((event = this.queue.poll()) != null) {
                    if (event.equals(Subscriptions.DROP_PERSISTENT_SUBSCRIPTION_EVENT)) {
                        Subscriptions.DropData previousDropData = this.dropData.getAndAccumulate(Subscriptions.UNKNOWN_DROP_DATA, (current, update) -> current == null ? update : current);
                        if (previousDropData == null) {
                            previousDropData = Subscriptions.UNKNOWN_DROP_DATA;
                        }
                        this.dropSubscription(previousDropData.reason, previousDropData.exception);
                        return;
                    }
                    Subscriptions.DropData currentDropData = this.dropData.get();
                    if (currentDropData != null) {
                        this.dropSubscription(currentDropData.reason, currentDropData.exception);
                        return;
                    }
                    try {
                        this.listener.onEvent(this, event);
                        if (this.autoAck) {
                            this.subscription.notifyEventsProcessed(Collections.singletonList(event.originalEvent().eventId));
                        }
                        logger.trace("Persistent subscription to {}: processed event ({}, {}, {} @ {}).", new Object[]{this.streamId, event.originalEvent().eventStreamId, event.originalEvent().eventNumber, event.originalEvent().eventType, event.originalEventNumber()});
                    }
                    catch (Exception e) {
                        this.dropSubscription(SubscriptionDropReason.EventHandlerException, e);
                        return;
                    }
                }
            }
            this.isProcessing.compareAndSet(true, false);
        } while (!this.queue.isEmpty() && this.isProcessing.compareAndSet(false, true));
    }

    private void dropSubscription(SubscriptionDropReason reason, Exception exception) {
        if (this.isDropped.compareAndSet(false, true)) {
            logger.trace("Persistent subscription to {}: dropping subscription, reason: {}", new Object[]{this.streamId, reason, exception});
            if (this.subscription != null) {
                this.subscription.unsubscribe();
            }
            this.listener.onClose(this, reason, exception);
            this.stopped.release();
        }
    }
}

