package com.github.msemys.esjc;

import com.github.msemys.esjc.event.ClientConnected;
import com.github.msemys.esjc.util.Numbers;
import com.github.msemys.esjc.util.Preconditions;
import com.github.msemys.esjc.util.Ranges;
import com.github.msemys.esjc.util.Strings;
import com.github.msemys.esjc.util.Subscriptions;
import com.github.msemys.esjc.util.concurrent.ResettableLatch;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/msemys/esjc/CatchUpSubscription.class */
public abstract class CatchUpSubscription implements AutoCloseable {
    public final String streamId;
    private final EventStore eventstore;
    private final boolean resolveLinkTos;
    private final UserCredentials userCredentials;
    protected final CatchUpSubscriptionListener listener;
    protected final int readBatchSize;
    protected final int maxPushQueueSize;
    private final Executor executor;
    private Subscription subscription;
    private volatile boolean allowProcessing;
    protected volatile boolean shouldStop;
    private final EventStoreListener reconnectionHook;
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private final Queue<ResolvedEvent> liveQueue = new ConcurrentLinkedQueue();
    private final AtomicReference<Subscriptions.DropData> dropData = new AtomicReference<>();
    private final AtomicBoolean isProcessing = new AtomicBoolean();
    private final AtomicBoolean isDropped = new AtomicBoolean();
    private final ResettableLatch stopped = new ResettableLatch(true);

    /* JADX INFO: Access modifiers changed from: protected */
    public CatchUpSubscription(EventStore eventStore, String str, boolean z, CatchUpSubscriptionListener catchUpSubscriptionListener, UserCredentials userCredentials, int i, int i2, Executor executor) {
        Preconditions.checkNotNull(eventStore, "eventstore is null");
        Preconditions.checkNotNull(catchUpSubscriptionListener, "listener is null");
        Preconditions.checkNotNull(catchUpSubscriptionListener, "executor is null");
        Preconditions.checkArgument(Ranges.BATCH_SIZE_RANGE.contains(i), "readBatchSize is out of range. Allowed range: %s.", Ranges.BATCH_SIZE_RANGE.toString());
        Preconditions.checkArgument(Numbers.isPositive(i2), "maxPushQueueSize should be positive");
        this.eventstore = eventStore;
        this.streamId = Strings.defaultIfEmpty(str, Strings.EMPTY);
        this.resolveLinkTos = z;
        this.listener = catchUpSubscriptionListener;
        this.userCredentials = userCredentials;
        this.readBatchSize = i;
        this.maxPushQueueSize = i2;
        this.executor = executor;
        this.reconnectionHook = event -> {
            if (event instanceof ClientConnected) {
                onReconnect();
            }
        };
    }

    protected abstract void readEventsTill(EventStore eventStore, boolean z, UserCredentials userCredentials, Long l, Long l2) throws Exception;

    protected abstract void tryProcess(ResolvedEvent resolvedEvent);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.logger.trace("Catch-up subscription to {}: starting...", streamId());
        runSubscription();
    }

    public void stop(Duration duration) throws TimeoutException {
        stop();
        this.logger.trace("Waiting on subscription to stop");
        if (!this.stopped.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException(String.format("Could not stop %s in time.", getClass().getSimpleName()));
        }
    }

    public void stop() {
        this.logger.trace("Catch-up subscription to {}: requesting stop...", streamId());
        this.logger.trace("Catch-up subscription to {}: unhooking from connection. Connected.", streamId());
        this.eventstore.removeListener(this.reconnectionHook);
        this.shouldStop = true;
        enqueueSubscriptionDropNotification(SubscriptionDropReason.UserInitiated, null);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws TimeoutException {
        stop(Duration.ofSeconds(2L));
    }

    private void onReconnect() {
        this.logger.trace("Catch-up subscription to {}: recovering after reconnection.", streamId());
        this.logger.trace("Catch-up subscription to {}: unhooking from connection. Connected.", streamId());
        this.eventstore.removeListener(this.reconnectionHook);
        while (this.dropData.get() != null && !this.isDropped.get()) {
            this.stopped.await(1L, TimeUnit.SECONDS);
        }
        runSubscription();
    }

    private void runSubscription() {
        this.executor.execute(() -> {
            this.logger.trace("Catch-up subscription to {}: running...", streamId());
            this.stopped.reset();
            this.allowProcessing = false;
            this.isDropped.set(false);
            this.dropData.set(null);
            try {
                if (!this.shouldStop) {
                    this.logger.trace("Catch-up subscription to {}: pulling events...", streamId());
                    readEventsTill(this.eventstore, this.resolveLinkTos, this.userCredentials, null, null);
                }
                if (!this.shouldStop) {
                    this.logger.trace("Catch-up subscription to {}: subscribing...", streamId());
                    VolatileSubscriptionListener volatileSubscriptionListener = new VolatileSubscriptionListener() { // from class: com.github.msemys.esjc.CatchUpSubscription.1
                        @Override // com.github.msemys.esjc.SubscriptionListener
                        public void onEvent(Subscription subscription, ResolvedEvent resolvedEvent) {
                            if (CatchUpSubscription.this.dropData.get() == null) {
                                CatchUpSubscription.this.logger.trace("Catch-up subscription to {}: event appeared ({}, {}, {} @ {}).", new Object[]{CatchUpSubscription.this.streamId(), resolvedEvent.originalStreamId(), Long.valueOf(resolvedEvent.originalEventNumber()), resolvedEvent.originalEvent().eventType, resolvedEvent.originalPosition});
                                if (CatchUpSubscription.this.liveQueue.size() >= CatchUpSubscription.this.maxPushQueueSize) {
                                    CatchUpSubscription.this.enqueueSubscriptionDropNotification(SubscriptionDropReason.ProcessingQueueOverflow, null);
                                    CatchUpSubscription.this.subscription.unsubscribe();
                                } else {
                                    CatchUpSubscription.this.liveQueue.offer(resolvedEvent);
                                    if (CatchUpSubscription.this.allowProcessing) {
                                        CatchUpSubscription.this.ensureProcessingPushQueue();
                                    }
                                }
                            }
                        }

                        @Override // com.github.msemys.esjc.SubscriptionListener
                        public void onClose(Subscription subscription, SubscriptionDropReason subscriptionDropReason, Exception exc) {
                            CatchUpSubscription.this.enqueueSubscriptionDropNotification(subscriptionDropReason, exc);
                        }
                    };
                    this.subscription = isSubscribedToAll() ? this.eventstore.subscribeToAll(this.resolveLinkTos, volatileSubscriptionListener, this.userCredentials).get() : this.eventstore.subscribeToStream(this.streamId, this.resolveLinkTos, volatileSubscriptionListener, this.userCredentials).get();
                    this.logger.trace("Catch-up subscription to {}: pulling events (if left)...", streamId());
                    readEventsTill(this.eventstore, this.resolveLinkTos, this.userCredentials, Long.valueOf(this.subscription.lastCommitPosition), this.subscription.lastEventNumber);
                }
                if (this.shouldStop) {
                    dropSubscription(SubscriptionDropReason.UserInitiated, null);
                    return;
                }
                this.logger.trace("Catch-up subscription to {}: processing live events...", streamId());
                this.listener.onLiveProcessingStarted(this);
                this.logger.trace("Catch-up subscription to {}: hooking to connection. Connected", streamId());
                this.eventstore.addListener(this.reconnectionHook);
                this.allowProcessing = true;
                ensureProcessingPushQueue();
            } catch (Exception e) {
                dropSubscription(SubscriptionDropReason.CatchUpError, e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueueSubscriptionDropNotification(SubscriptionDropReason subscriptionDropReason, Exception exc) {
        if (this.dropData.compareAndSet(null, new Subscriptions.DropData(subscriptionDropReason, exc))) {
            this.liveQueue.offer(Subscriptions.DROP_SUBSCRIPTION_EVENT);
            if (this.allowProcessing) {
                ensureProcessingPushQueue();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ensureProcessingPushQueue() {
        if (this.isProcessing.compareAndSet(false, true)) {
            this.executor.execute(this::processLiveQueue);
        }
    }

    private void processLiveQueue() {
        while (true) {
            ResolvedEvent poll = this.liveQueue.poll();
            if (poll == null) {
                this.isProcessing.compareAndSet(true, false);
                if (this.liveQueue.isEmpty() || !this.isProcessing.compareAndSet(false, true)) {
                    return;
                }
            } else {
                if (poll.equals(Subscriptions.DROP_SUBSCRIPTION_EVENT)) {
                    Subscriptions.DropData andAccumulate = this.dropData.getAndAccumulate(Subscriptions.UNKNOWN_DROP_DATA, (dropData, dropData2) -> {
                        return dropData == null ? dropData2 : dropData;
                    });
                    if (andAccumulate == null) {
                        andAccumulate = Subscriptions.UNKNOWN_DROP_DATA;
                    }
                    dropSubscription(andAccumulate.reason, andAccumulate.exception);
                    this.isProcessing.compareAndSet(true, false);
                    return;
                }
                try {
                    tryProcess(poll);
                } catch (Exception e) {
                    dropSubscription(SubscriptionDropReason.EventHandlerException, e);
                    return;
                }
            }
        }
    }

    private void dropSubscription(SubscriptionDropReason subscriptionDropReason, Exception exc) {
        if (this.isDropped.compareAndSet(false, true)) {
            this.logger.trace("Catch-up subscription to {}: dropping subscription, reason: {}.", new Object[]{streamId(), subscriptionDropReason, exc});
            if (this.subscription != null) {
                this.subscription.unsubscribe();
            }
            this.listener.onClose(this, subscriptionDropReason, exc);
            this.stopped.release();
        }
    }

    public boolean isSubscribedToAll() {
        return Strings.isNullOrEmpty(this.streamId);
    }

    public abstract long lastProcessedEventNumber();

    public abstract Position lastProcessedPosition();

    /* JADX INFO: Access modifiers changed from: protected */
    public String streamId() {
        return Strings.defaultIfEmpty(this.streamId, "<all>");
    }
}
