/*
 * Decompiled with CFR 0.152.
 */
package com.github.msemys.esjc.subscription;

import com.github.msemys.esjc.AllEventsSlice;
import com.github.msemys.esjc.CatchUpSubscription;
import com.github.msemys.esjc.CatchUpSubscriptionListener;
import com.github.msemys.esjc.EventStore;
import com.github.msemys.esjc.Position;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.UserCredentials;
import com.github.msemys.esjc.util.Threads;
import java.util.concurrent.Executor;

public class AllCatchUpSubscription
extends CatchUpSubscription {
    private Position nextReadPosition;
    private Position lastProcessedPosition;

    public AllCatchUpSubscription(EventStore eventstore, Position position, boolean resolveLinkTos, CatchUpSubscriptionListener listener, UserCredentials userCredentials, int readBatchSize, int maxPushQueueSize, Executor executor) {
        super(eventstore, "", resolveLinkTos, listener, userCredentials, readBatchSize, maxPushQueueSize, executor);
        this.lastProcessedPosition = position == null ? Position.END : position;
        this.nextReadPosition = position == null ? Position.START : position;
    }

    @Override
    protected void readEventsTill(EventStore eventstore, boolean resolveLinkTos, UserCredentials userCredentials, Long lastCommitPosition, Long lastEventNumber) throws Exception {
        boolean done;
        do {
            AllEventsSlice slice = eventstore.readAllEventsForward(this.nextReadPosition, this.readBatchSize, resolveLinkTos, userCredentials).get();
            for (ResolvedEvent e : slice.events) {
                if (e.originalPosition == null) {
                    throw new Exception("Subscription event came up with no OriginalPosition.");
                }
                this.tryProcess(e);
            }
            this.nextReadPosition = slice.nextPosition;
            boolean bl = lastCommitPosition == null ? slice.isEndOfStream() : (done = slice.nextPosition.compareTo(new Position(lastCommitPosition, lastCommitPosition)) >= 0);
            if (done || !slice.isEndOfStream()) continue;
            Threads.sleepUninterruptibly(1L);
        } while (!done && !this.shouldStop);
        this.logger.trace("Catch-up subscription to {}: finished reading events, nextReadPosition = {}.", (Object)this.streamId(), (Object)this.nextReadPosition);
    }

    @Override
    protected void tryProcess(ResolvedEvent event) {
        boolean processed = false;
        if (event.originalPosition.compareTo(this.lastProcessedPosition) > 0) {
            this.listener.onEvent(this, event);
            this.lastProcessedPosition = event.originalPosition;
            processed = true;
        }
        this.logger.trace("Catch-up subscription to {}: {} event ({}, {}, {} @ {}).", new Object[]{this.streamId(), processed ? "processed" : "skipping", event.originalEvent().eventStreamId, event.originalEvent().eventNumber, event.originalEvent().eventType, event.originalPosition});
    }

    @Override
    public Position lastProcessedPosition() {
        Position currentPosition;
        Position oldPosition = this.lastProcessedPosition;
        while (oldPosition != (currentPosition = this.lastProcessedPosition)) {
            oldPosition = currentPosition;
        }
        return currentPosition;
    }

    @Override
    public long lastProcessedEventNumber() {
        throw new UnsupportedOperationException("The last processed event number is not available to ALL stream catch-up subscriptions.");
    }
}

