/*
 * Decompiled with CFR 0.152.
 */
package com.github.msemys.esjc;

import com.github.msemys.esjc.CatchUpSubscriptionListener;
import com.github.msemys.esjc.EventStore;
import com.github.msemys.esjc.EventStoreListener;
import com.github.msemys.esjc.Position;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.Subscription;
import com.github.msemys.esjc.SubscriptionDropReason;
import com.github.msemys.esjc.UserCredentials;
import com.github.msemys.esjc.VolatileSubscriptionListener;
import com.github.msemys.esjc.event.ClientConnected;
import com.github.msemys.esjc.util.Numbers;
import com.github.msemys.esjc.util.Preconditions;
import com.github.msemys.esjc.util.Ranges;
import com.github.msemys.esjc.util.Strings;
import com.github.msemys.esjc.util.Subscriptions;
import com.github.msemys.esjc.util.concurrent.ResettableLatch;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CatchUpSubscription
implements AutoCloseable {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    public final String streamId;
    private final EventStore eventstore;
    private final boolean resolveLinkTos;
    private final UserCredentials userCredentials;
    protected final CatchUpSubscriptionListener listener;
    protected final int readBatchSize;
    protected final int maxPushQueueSize;
    private final Executor executor;
    private final Queue<ResolvedEvent> liveQueue = new ConcurrentLinkedQueue<ResolvedEvent>();
    private Subscription subscription;
    private final AtomicReference<Subscriptions.DropData> dropData = new AtomicReference();
    private volatile boolean allowProcessing;
    private final AtomicBoolean isProcessing = new AtomicBoolean();
    protected volatile boolean shouldStop;
    private final AtomicBoolean isDropped = new AtomicBoolean();
    private final ResettableLatch stopped = new ResettableLatch(true);
    private final EventStoreListener reconnectionHook;

    protected CatchUpSubscription(EventStore eventstore, String streamId, boolean resolveLinkTos, CatchUpSubscriptionListener listener, UserCredentials userCredentials, int readBatchSize, int maxPushQueueSize, Executor executor) {
        Preconditions.checkNotNull(eventstore, "eventstore is null");
        Preconditions.checkNotNull(listener, "listener is null");
        Preconditions.checkNotNull(listener, "executor is null");
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(readBatchSize), "readBatchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        Preconditions.checkArgument(Numbers.isPositive(maxPushQueueSize), "maxPushQueueSize should be positive");
        this.eventstore = eventstore;
        this.streamId = Strings.defaultIfEmpty(streamId, "");
        this.resolveLinkTos = resolveLinkTos;
        this.listener = listener;
        this.userCredentials = userCredentials;
        this.readBatchSize = readBatchSize;
        this.maxPushQueueSize = maxPushQueueSize;
        this.executor = executor;
        this.reconnectionHook = event -> {
            if (event instanceof ClientConnected) {
                this.onReconnect();
            }
        };
    }

    protected abstract void readEventsTill(EventStore var1, boolean var2, UserCredentials var3, Long var4, Long var5) throws Exception;

    protected abstract void tryProcess(ResolvedEvent var1);

    void start() {
        this.logger.trace("Catch-up subscription to {}: starting...", (Object)this.streamId());
        this.runSubscription();
    }

    public void stop(Duration timeout) throws TimeoutException {
        this.stop();
        this.logger.trace("Waiting on subscription to stop");
        if (!this.stopped.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException(String.format("Could not stop %s in time.", this.getClass().getSimpleName()));
        }
    }

    public void stop() {
        this.logger.trace("Catch-up subscription to {}: requesting stop...", (Object)this.streamId());
        this.logger.trace("Catch-up subscription to {}: unhooking from connection. Connected.", (Object)this.streamId());
        this.eventstore.removeListener(this.reconnectionHook);
        this.shouldStop = true;
        this.enqueueSubscriptionDropNotification(SubscriptionDropReason.UserInitiated, null);
    }

    @Override
    public void close() throws TimeoutException {
        this.stop(Duration.ofSeconds(2L));
    }

    private void onReconnect() {
        this.logger.trace("Catch-up subscription to {}: recovering after reconnection.", (Object)this.streamId());
        this.logger.trace("Catch-up subscription to {}: unhooking from connection. Connected.", (Object)this.streamId());
        this.eventstore.removeListener(this.reconnectionHook);
        while (this.dropData.get() != null && !this.isDropped.get()) {
            this.stopped.await(1L, TimeUnit.SECONDS);
        }
        this.runSubscription();
    }

    private void runSubscription() {
        this.executor.execute(() -> {
            this.logger.trace("Catch-up subscription to {}: running...", (Object)this.streamId());
            this.stopped.reset();
            this.allowProcessing = false;
            this.isDropped.set(false);
            this.dropData.set(null);
            try {
                if (!this.shouldStop) {
                    this.logger.trace("Catch-up subscription to {}: pulling events...", (Object)this.streamId());
                    this.readEventsTill(this.eventstore, this.resolveLinkTos, this.userCredentials, null, null);
                }
                if (!this.shouldStop) {
                    this.logger.trace("Catch-up subscription to {}: subscribing...", (Object)this.streamId());
                    VolatileSubscriptionListener subscriptionListener = new VolatileSubscriptionListener(){

                        @Override
                        public void onEvent(Subscription s, ResolvedEvent event) {
                            if (CatchUpSubscription.this.dropData.get() == null) {
                                CatchUpSubscription.this.logger.trace("Catch-up subscription to {}: event appeared ({}, {}, {} @ {}).", new Object[]{CatchUpSubscription.this.streamId(), event.originalStreamId(), event.originalEventNumber(), event.originalEvent().eventType, event.originalPosition});
                                if (CatchUpSubscription.this.liveQueue.size() >= CatchUpSubscription.this.maxPushQueueSize) {
                                    CatchUpSubscription.this.enqueueSubscriptionDropNotification(SubscriptionDropReason.ProcessingQueueOverflow, null);
                                    CatchUpSubscription.this.subscription.unsubscribe();
                                } else {
                                    CatchUpSubscription.this.liveQueue.offer(event);
                                    if (CatchUpSubscription.this.allowProcessing) {
                                        CatchUpSubscription.this.ensureProcessingPushQueue();
                                    }
                                }
                            }
                        }

                        @Override
                        public void onClose(Subscription s, SubscriptionDropReason reason, Exception exception) {
                            CatchUpSubscription.this.enqueueSubscriptionDropNotification(reason, exception);
                        }
                    };
                    this.subscription = this.isSubscribedToAll() ? this.eventstore.subscribeToAll(this.resolveLinkTos, subscriptionListener, this.userCredentials).get() : this.eventstore.subscribeToStream(this.streamId, this.resolveLinkTos, subscriptionListener, this.userCredentials).get();
                    this.logger.trace("Catch-up subscription to {}: pulling events (if left)...", (Object)this.streamId());
                    this.readEventsTill(this.eventstore, this.resolveLinkTos, this.userCredentials, this.subscription.lastCommitPosition, this.subscription.lastEventNumber);
                }
            }
            catch (Exception e) {
                this.dropSubscription(SubscriptionDropReason.CatchUpError, e);
                return;
            }
            if (this.shouldStop) {
                this.dropSubscription(SubscriptionDropReason.UserInitiated, null);
                return;
            }
            this.logger.trace("Catch-up subscription to {}: processing live events...", (Object)this.streamId());
            this.listener.onLiveProcessingStarted(this);
            this.logger.trace("Catch-up subscription to {}: hooking to connection. Connected", (Object)this.streamId());
            this.eventstore.addListener(this.reconnectionHook);
            this.allowProcessing = true;
            this.ensureProcessingPushQueue();
        });
    }

    private void enqueueSubscriptionDropNotification(SubscriptionDropReason reason, Exception exception) {
        if (this.dropData.compareAndSet(null, new Subscriptions.DropData(reason, exception))) {
            this.liveQueue.offer(Subscriptions.DROP_SUBSCRIPTION_EVENT);
            if (this.allowProcessing) {
                this.ensureProcessingPushQueue();
            }
        }
    }

    private void ensureProcessingPushQueue() {
        if (this.isProcessing.compareAndSet(false, true)) {
            this.executor.execute(this::processLiveQueue);
        }
    }

    private void processLiveQueue() {
        while (true) {
            ResolvedEvent event;
            if ((event = this.liveQueue.poll()) != null) {
                if (event.equals(Subscriptions.DROP_SUBSCRIPTION_EVENT)) {
                    Subscriptions.DropData previousDropData = this.dropData.getAndAccumulate(Subscriptions.UNKNOWN_DROP_DATA, (current, update) -> current == null ? update : current);
                    if (previousDropData == null) {
                        previousDropData = Subscriptions.UNKNOWN_DROP_DATA;
                    }
                    this.dropSubscription(previousDropData.reason, previousDropData.exception);
                    this.isProcessing.compareAndSet(true, false);
                    return;
                }
                try {
                    this.tryProcess(event);
                }
                catch (Exception e) {
                    this.dropSubscription(SubscriptionDropReason.EventHandlerException, e);
                    return;
                }
            }
            this.isProcessing.compareAndSet(true, false);
            if (this.liveQueue.isEmpty() || !this.isProcessing.compareAndSet(false, true)) break;
        }
    }

    private void dropSubscription(SubscriptionDropReason reason, Exception exception) {
        if (this.isDropped.compareAndSet(false, true)) {
            this.logger.trace("Catch-up subscription to {}: dropping subscription, reason: {}.", new Object[]{this.streamId(), reason, exception});
            if (this.subscription != null) {
                this.subscription.unsubscribe();
            }
            this.listener.onClose(this, reason, exception);
            this.stopped.release();
        }
    }

    public boolean isSubscribedToAll() {
        return Strings.isNullOrEmpty(this.streamId);
    }

    public abstract long lastProcessedEventNumber();

    public abstract Position lastProcessedPosition();

    protected String streamId() {
        return Strings.defaultIfEmpty(this.streamId, "<all>");
    }
}

