package com.github.msemys.esjc.subscription;

import com.github.msemys.esjc.CatchUpSubscription;
import com.github.msemys.esjc.CatchUpSubscriptionListener;
import com.github.msemys.esjc.EventStore;
import com.github.msemys.esjc.Position;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.StreamEventsSlice;
import com.github.msemys.esjc.UserCredentials;
import com.github.msemys.esjc.operation.StreamDeletedException;
import com.github.msemys.esjc.operation.StreamNotFoundException;
import com.github.msemys.esjc.util.Preconditions;
import com.github.msemys.esjc.util.Strings;
import com.github.msemys.esjc.util.Threads;
import java.util.concurrent.Executor;
import org.slf4j.Logger;

/* loaded from: input_file:com/github/msemys/esjc/subscription/StreamCatchUpSubscription.class */
public class StreamCatchUpSubscription extends CatchUpSubscription {
    private long nextReadEventNumber;
    private long lastProcessedEventNumber;

    public StreamCatchUpSubscription(EventStore eventStore, String str, Long l, boolean z, CatchUpSubscriptionListener catchUpSubscriptionListener, UserCredentials userCredentials, int i, int i2, Executor executor) {
        super(eventStore, str, z, catchUpSubscriptionListener, userCredentials, i, i2, executor);
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "streamId is null or empty");
        this.lastProcessedEventNumber = l == null ? -1L : l.longValue();
        this.nextReadEventNumber = l == null ? 0L : l.longValue();
    }

    @Override // com.github.msemys.esjc.CatchUpSubscription
    protected void readEventsTill(EventStore eventStore, boolean z, UserCredentials userCredentials, Long l, Long l2) throws Exception {
        boolean z2;
        do {
            StreamEventsSlice streamEventsSlice = eventStore.readStreamEventsForward(this.streamId, this.nextReadEventNumber, this.readBatchSize, z, userCredentials).get();
            switch (streamEventsSlice.status) {
                case Success:
                    streamEventsSlice.events.forEach(this::tryProcess);
                    this.nextReadEventNumber = streamEventsSlice.nextEventNumber;
                    z2 = l2 == null ? streamEventsSlice.isEndOfStream : streamEventsSlice.nextEventNumber > l2.longValue();
                    break;
                case StreamNotFound:
                    if (l2 != null && l2.longValue() != -1) {
                        throw new StreamNotFoundException("Impossible: stream %s disappeared in the middle of catching up subscription.", this.streamId);
                    }
                    z2 = true;
                    break;
                case StreamDeleted:
                    throw new StreamDeletedException(this.streamId);
                default:
                    throw new IllegalStateException(String.format("Unexpected StreamEventsSlice.Status: %s.", streamEventsSlice.status));
            }
            if (!z2 && streamEventsSlice.isEndOfStream) {
                Threads.sleepUninterruptibly(1L);
            }
            if (!z2) {
            }
            this.logger.trace("Catch-up subscription to {}: finished reading events, nextReadEventNumber = {}.", streamId(), Long.valueOf(this.nextReadEventNumber));
        } while (!this.shouldStop);
        this.logger.trace("Catch-up subscription to {}: finished reading events, nextReadEventNumber = {}.", streamId(), Long.valueOf(this.nextReadEventNumber));
    }

    @Override // com.github.msemys.esjc.CatchUpSubscription
    protected void tryProcess(ResolvedEvent resolvedEvent) {
        boolean z = false;
        if (resolvedEvent.originalEventNumber() > this.lastProcessedEventNumber) {
            this.listener.onEvent(this, resolvedEvent);
            this.lastProcessedEventNumber = resolvedEvent.originalEventNumber();
            z = true;
        }
        Logger logger = this.logger;
        Object[] objArr = new Object[6];
        objArr[0] = streamId();
        objArr[1] = z ? "processed" : "skipping";
        objArr[2] = resolvedEvent.originalEvent().eventStreamId;
        objArr[3] = Long.valueOf(resolvedEvent.originalEvent().eventNumber);
        objArr[4] = resolvedEvent.originalEvent().eventType;
        objArr[5] = Long.valueOf(resolvedEvent.originalEventNumber());
        logger.trace("Catch-up subscription to {}: {} event ({}, {}, {} @ {}).", objArr);
    }

    @Override // com.github.msemys.esjc.CatchUpSubscription
    public long lastProcessedEventNumber() {
        return this.lastProcessedEventNumber;
    }

    @Override // com.github.msemys.esjc.CatchUpSubscription
    public Position lastProcessedPosition() {
        throw new UnsupportedOperationException("The last processed position is not available to a specific stream catch-up subscriptions.");
    }
}
