/*
 * Decompiled with CFR 0.152.
 */
package com.github.msemys.esjc.subscription;

import com.github.msemys.esjc.CatchUpSubscription;
import com.github.msemys.esjc.CatchUpSubscriptionListener;
import com.github.msemys.esjc.EventStore;
import com.github.msemys.esjc.Position;
import com.github.msemys.esjc.ResolvedEvent;
import com.github.msemys.esjc.StreamEventsSlice;
import com.github.msemys.esjc.UserCredentials;
import com.github.msemys.esjc.operation.StreamDeletedException;
import com.github.msemys.esjc.operation.StreamNotFoundException;
import com.github.msemys.esjc.util.Preconditions;
import com.github.msemys.esjc.util.Strings;
import com.github.msemys.esjc.util.Threads;
import java.util.concurrent.Executor;

public class StreamCatchUpSubscription
extends CatchUpSubscription {
    private long nextReadEventNumber;
    private long lastProcessedEventNumber;

    public StreamCatchUpSubscription(EventStore eventstore, String streamId, Long eventNumber, boolean resolveLinkTos, CatchUpSubscriptionListener listener, UserCredentials userCredentials, int readBatchSize, int maxPushQueueSize, Executor executor) {
        super(eventstore, streamId, resolveLinkTos, listener, userCredentials, readBatchSize, maxPushQueueSize, executor);
        Preconditions.checkArgument(!Strings.isNullOrEmpty(streamId), "streamId is null or empty");
        this.lastProcessedEventNumber = eventNumber == null ? -1L : eventNumber;
        this.nextReadEventNumber = eventNumber == null ? 0L : eventNumber;
    }

    @Override
    protected void readEventsTill(EventStore eventstore, boolean resolveLinkTos, UserCredentials userCredentials, Long lastCommitPosition, Long lastEventNumber) throws Exception {
        boolean done;
        do {
            StreamEventsSlice slice = eventstore.readStreamEventsForward(this.streamId, this.nextReadEventNumber, this.readBatchSize, resolveLinkTos, userCredentials).get();
            switch (slice.status) {
                case Success: {
                    slice.events.forEach(this::tryProcess);
                    this.nextReadEventNumber = slice.nextEventNumber;
                    done = lastEventNumber == null ? slice.isEndOfStream : slice.nextEventNumber > lastEventNumber;
                    break;
                }
                case StreamNotFound: {
                    if (lastEventNumber != null && lastEventNumber != -1L) {
                        throw new StreamNotFoundException("Impossible: stream %s disappeared in the middle of catching up subscription.", this.streamId);
                    }
                    done = true;
                    break;
                }
                case StreamDeleted: {
                    throw new StreamDeletedException(this.streamId);
                }
                default: {
                    throw new IllegalStateException(String.format("Unexpected StreamEventsSlice.Status: %s.", new Object[]{slice.status}));
                }
            }
            if (done || !slice.isEndOfStream) continue;
            Threads.sleepUninterruptibly(1L);
        } while (!done && !this.shouldStop);
        this.logger.trace("Catch-up subscription to {}: finished reading events, nextReadEventNumber = {}.", (Object)this.streamId(), (Object)this.nextReadEventNumber);
    }

    @Override
    protected void tryProcess(ResolvedEvent event) {
        boolean processed = false;
        if (event.originalEventNumber() > this.lastProcessedEventNumber) {
            this.listener.onEvent(this, event);
            this.lastProcessedEventNumber = event.originalEventNumber();
            processed = true;
        }
        this.logger.trace("Catch-up subscription to {}: {} event ({}, {}, {} @ {}).", new Object[]{this.streamId(), processed ? "processed" : "skipping", event.originalEvent().eventStreamId, event.originalEvent().eventNumber, event.originalEvent().eventType, event.originalEventNumber()});
    }

    @Override
    public long lastProcessedEventNumber() {
        return this.lastProcessedEventNumber;
    }

    @Override
    public Position lastProcessedPosition() {
        throw new UnsupportedOperationException("The last processed position is not available to a specific stream catch-up subscriptions.");
    }
}

