package com.github.msemys.esjc;

import com.github.msemys.esjc.subscription.PersistentSubscriptionChannel;
import com.github.msemys.esjc.subscription.PersistentSubscriptionNakEventAction;
import com.github.msemys.esjc.util.Preconditions;
import com.github.msemys.esjc.util.Subscriptions;
import com.github.msemys.esjc.util.Threads;
import com.github.msemys.esjc.util.concurrent.ResettableLatch;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/msemys/esjc/PersistentSubscription.class */
public abstract class PersistentSubscription implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(PersistentSubscription.class);
    private static final int MAX_EVENTS = 2000;
    private final String subscriptionId;
    private final String streamId;
    private final PersistentSubscriptionListener listener;
    private final UserCredentials userCredentials;
    private final boolean autoAck;
    private PersistentSubscriptionChannel subscription;
    private final Queue<RetryableResolvedEvent> queue = new ConcurrentLinkedQueue();
    private final AtomicBoolean isProcessing = new AtomicBoolean();
    private final AtomicReference<Subscriptions.DropData> dropData = new AtomicReference<>();
    private final AtomicBoolean isDropped = new AtomicBoolean();
    private final ResettableLatch stopped = new ResettableLatch(true);
    private final int bufferSize;
    private final Executor executor;

    /* JADX INFO: Access modifiers changed from: protected */
    public PersistentSubscription(String str, String str2, PersistentSubscriptionListener persistentSubscriptionListener, UserCredentials userCredentials, int i, boolean z, Executor executor) {
        this.subscriptionId = str;
        this.streamId = str2;
        this.listener = persistentSubscriptionListener;
        this.userCredentials = userCredentials;
        this.bufferSize = i;
        this.autoAck = z;
        this.executor = executor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<PersistentSubscription> start() {
        this.stopped.reset();
        return startSubscription(this.subscriptionId, this.streamId, this.bufferSize, new SubscriptionListener<PersistentSubscriptionChannel, RetryableResolvedEvent>() { // from class: com.github.msemys.esjc.PersistentSubscription.1
            @Override // com.github.msemys.esjc.SubscriptionListener
            public void onEvent(PersistentSubscriptionChannel persistentSubscriptionChannel, RetryableResolvedEvent retryableResolvedEvent) {
                PersistentSubscription.this.enqueue(retryableResolvedEvent);
            }

            @Override // com.github.msemys.esjc.SubscriptionListener
            public void onClose(PersistentSubscriptionChannel persistentSubscriptionChannel, SubscriptionDropReason subscriptionDropReason, Exception exc) {
                PersistentSubscription.this.enqueueSubscriptionDropNotification(subscriptionDropReason, exc);
            }
        }, this.userCredentials).thenApply(subscription -> {
            this.subscription = (PersistentSubscriptionChannel) subscription;
            return this;
        });
    }

    protected abstract CompletableFuture<Subscription> startSubscription(String str, String str2, int i, SubscriptionListener<PersistentSubscriptionChannel, RetryableResolvedEvent> subscriptionListener, UserCredentials userCredentials);

    public void acknowledge(UUID... uuidArr) {
        Preconditions.checkArgument(uuidArr.length <= MAX_EVENTS, "events is limited to %d to ack at a time", Integer.valueOf(MAX_EVENTS));
        this.subscription.notifyEventsProcessed(Arrays.asList(uuidArr));
    }

    public void acknowledge(ResolvedEvent resolvedEvent) {
        acknowledge(resolvedEvent.originalEvent().eventId);
    }

    public void acknowledge(List<ResolvedEvent> list) {
        acknowledge((UUID[]) list.stream().map(resolvedEvent -> {
            return resolvedEvent.originalEvent().eventId;
        }).toArray(i -> {
            return new UUID[i];
        }));
    }

    public void fail(ResolvedEvent resolvedEvent, PersistentSubscriptionNakEventAction persistentSubscriptionNakEventAction, String str) {
        this.subscription.notifyEventsFailed(Collections.singletonList(resolvedEvent.originalEvent().eventId), persistentSubscriptionNakEventAction, str);
    }

    public void fail(List<ResolvedEvent> list, PersistentSubscriptionNakEventAction persistentSubscriptionNakEventAction, String str) {
        Preconditions.checkArgument(list.size() <= MAX_EVENTS, "events is limited to %d to ack at a time", Integer.valueOf(MAX_EVENTS));
        this.subscription.notifyEventsFailed((List) list.stream().map(resolvedEvent -> {
            return resolvedEvent.originalEvent().eventId;
        }).collect(Collectors.toCollection(() -> {
            return new ArrayList(list.size());
        })), persistentSubscriptionNakEventAction, str);
    }

    public void stop(Duration duration) throws TimeoutException {
        logger.trace("Persistent subscription to {}: requesting stop...", this.streamId);
        enqueueSubscriptionDropNotification(SubscriptionDropReason.UserInitiated, null);
        if (!this.stopped.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException(String.format("Could not stop %s in time.", getClass().getSimpleName()));
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws TimeoutException {
        stop(Duration.ofSeconds(2L));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueueSubscriptionDropNotification(SubscriptionDropReason subscriptionDropReason, Exception exc) {
        if (this.dropData.compareAndSet(null, new Subscriptions.DropData(subscriptionDropReason, exc))) {
            enqueue(Subscriptions.DROP_PERSISTENT_SUBSCRIPTION_EVENT);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueue(RetryableResolvedEvent retryableResolvedEvent) {
        this.queue.offer(retryableResolvedEvent);
        if (this.isProcessing.compareAndSet(false, true)) {
            this.executor.execute(this::processQueue);
        }
    }

    private void processQueue() {
        do {
            if (this.subscription != null) {
                while (true) {
                    RetryableResolvedEvent poll = this.queue.poll();
                    if (poll == null) {
                        break;
                    }
                    if (poll.equals(Subscriptions.DROP_PERSISTENT_SUBSCRIPTION_EVENT)) {
                        Subscriptions.DropData andAccumulate = this.dropData.getAndAccumulate(Subscriptions.UNKNOWN_DROP_DATA, (dropData, dropData2) -> {
                            return dropData == null ? dropData2 : dropData;
                        });
                        if (andAccumulate == null) {
                            andAccumulate = Subscriptions.UNKNOWN_DROP_DATA;
                        }
                        dropSubscription(andAccumulate.reason, andAccumulate.exception);
                        return;
                    }
                    Subscriptions.DropData dropData3 = this.dropData.get();
                    if (dropData3 != null) {
                        dropSubscription(dropData3.reason, dropData3.exception);
                        return;
                    }
                    try {
                        this.listener.onEvent(this, poll);
                        if (this.autoAck) {
                            this.subscription.notifyEventsProcessed(Collections.singletonList(poll.originalEvent().eventId));
                        }
                        logger.trace("Persistent subscription to {}: processed event ({}, {}, {} @ {}).", new Object[]{this.streamId, poll.originalEvent().eventStreamId, Long.valueOf(poll.originalEvent().eventNumber), poll.originalEvent().eventType, Long.valueOf(poll.originalEventNumber())});
                    } catch (Exception e) {
                        dropSubscription(SubscriptionDropReason.EventHandlerException, e);
                        return;
                    }
                }
            } else {
                Threads.sleepUninterruptibly(1L);
            }
            this.isProcessing.compareAndSet(true, false);
            if (this.queue.isEmpty()) {
                return;
            }
        } while (this.isProcessing.compareAndSet(false, true));
    }

    private void dropSubscription(SubscriptionDropReason subscriptionDropReason, Exception exc) {
        if (this.isDropped.compareAndSet(false, true)) {
            logger.trace("Persistent subscription to {}: dropping subscription, reason: {}", new Object[]{this.streamId, subscriptionDropReason, exc});
            if (this.subscription != null) {
                this.subscription.unsubscribe();
            }
            this.listener.onClose(this, subscriptionDropReason, exc);
            this.stopped.release();
        }
    }
}
