package com.github.msemys.esjc.subscription;

import com.github.msemys.esjc.AllEventsSlice;
import com.github.msemys.esjc.CatchUpSubscription;
import com.github.msemys.esjc.CatchUpSubscriptionListener;
import com.github.msemys.esjc.EventStore;
import com.github.msemys.esjc.Position;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.UserCredentials;
import com.github.msemys.esjc.util.Strings;
import com.github.msemys.esjc.util.Threads;
import java.util.concurrent.Executor;
import org.slf4j.Logger;

/* loaded from: input_file:com/github/msemys/esjc/subscription/AllCatchUpSubscription.class */
public class AllCatchUpSubscription extends CatchUpSubscription {
    private Position nextReadPosition;
    private Position lastProcessedPosition;

    public AllCatchUpSubscription(EventStore eventStore, Position position, boolean z, CatchUpSubscriptionListener catchUpSubscriptionListener, UserCredentials userCredentials, int i, int i2, Executor executor) {
        super(eventStore, Strings.EMPTY, z, catchUpSubscriptionListener, userCredentials, i, i2, executor);
        this.lastProcessedPosition = position == null ? Position.END : position;
        this.nextReadPosition = position == null ? Position.START : position;
    }

    @Override // com.github.msemys.esjc.CatchUpSubscription
    protected void readEventsTill(EventStore eventStore, boolean z, UserCredentials userCredentials, Long l, Long l2) throws Exception {
        do {
            AllEventsSlice allEventsSlice = eventStore.readAllEventsForward(this.nextReadPosition, this.readBatchSize, z, userCredentials).get();
            for (ResolvedEvent resolvedEvent : allEventsSlice.events) {
                if (resolvedEvent.originalPosition == null) {
                    throw new Exception("Subscription event came up with no OriginalPosition.");
                }
                tryProcess(resolvedEvent);
            }
            this.nextReadPosition = allEventsSlice.nextPosition;
            boolean isEndOfStream = l == null ? allEventsSlice.isEndOfStream() : allEventsSlice.nextPosition.compareTo(new Position(l.longValue(), l.longValue())) >= 0;
            if (!isEndOfStream && allEventsSlice.isEndOfStream()) {
                Threads.sleepUninterruptibly(1L);
            }
            if (isEndOfStream) {
                break;
            }
        } while (!this.shouldStop);
        this.logger.trace("Catch-up subscription to {}: finished reading events, nextReadPosition = {}.", streamId(), this.nextReadPosition);
    }

    @Override // com.github.msemys.esjc.CatchUpSubscription
    protected void tryProcess(ResolvedEvent resolvedEvent) {
        boolean z = false;
        if (resolvedEvent.originalPosition.compareTo(this.lastProcessedPosition) > 0) {
            this.listener.onEvent(this, resolvedEvent);
            this.lastProcessedPosition = resolvedEvent.originalPosition;
            z = true;
        }
        Logger logger = this.logger;
        Object[] objArr = new Object[6];
        objArr[0] = streamId();
        objArr[1] = z ? "processed" : "skipping";
        objArr[2] = resolvedEvent.originalEvent().eventStreamId;
        objArr[3] = Long.valueOf(resolvedEvent.originalEvent().eventNumber);
        objArr[4] = resolvedEvent.originalEvent().eventType;
        objArr[5] = resolvedEvent.originalPosition;
        logger.trace("Catch-up subscription to {}: {} event ({}, {}, {} @ {}).", objArr);
    }

    @Override // com.github.msemys.esjc.CatchUpSubscription
    public Position lastProcessedPosition() {
        Position position = this.lastProcessedPosition;
        while (true) {
            Position position2 = position;
            Position position3 = this.lastProcessedPosition;
            if (position2 == position3) {
                return position3;
            }
            position = position3;
        }
    }

    @Override // com.github.msemys.esjc.CatchUpSubscription
    public long lastProcessedEventNumber() {
        throw new UnsupportedOperationException("The last processed event number is not available to ALL stream catch-up subscriptions.");
    }
}
