/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.impl.SnapshotRepository;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.SnapshotBarrier;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.AsyncSnapshotWriter;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.util.function.Predicate;
import java.util.Map;
import javax.annotation.Nonnull;

public class StoreSnapshotTasklet
implements Tasklet {
    long pendingSnapshotId;
    private final SnapshotContext snapshotContext;
    private final long jobId;
    private final InboundEdgeStream inboundEdgeStream;
    private final ILogger logger;
    private final String vertexName;
    private final boolean isHigherPrioritySource;
    private final AsyncSnapshotWriter ssWriter;
    private final ProgressTracker progTracker = new ProgressTracker();
    private State state = State.DRAIN;
    private boolean hasReachedBarrier;
    private Map.Entry<Data, Data> pendingEntry;
    private Predicate<Object> addToInboxFunction;

    public StoreSnapshotTasklet(SnapshotContext snapshotContext, long jobId, InboundEdgeStream inboundEdgeStream, AsyncSnapshotWriter ssWriter, ILogger logger, String vertexName, boolean isHigherPrioritySource) {
        this.snapshotContext = snapshotContext;
        this.jobId = jobId;
        this.inboundEdgeStream = inboundEdgeStream;
        this.logger = logger;
        this.vertexName = vertexName;
        this.isHigherPrioritySource = isHigherPrioritySource;
        this.ssWriter = ssWriter;
        this.pendingSnapshotId = snapshotContext.activeSnapshotId() + 1L;
        this.resetCurrentMap();
        this.addToInboxFunction = this::addToInbox;
    }

    @Override
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        this.stateMachineStep();
        return this.progTracker.toProgressState();
    }

    private void stateMachineStep() {
        switch (this.state) {
            case DRAIN: {
                this.progTracker.notDone();
                if (this.pendingEntry != null) {
                    if (!this.ssWriter.offer(this.pendingEntry)) {
                        return;
                    }
                    this.progTracker.madeProgress();
                }
                this.pendingEntry = null;
                ProgressState result = this.inboundEdgeStream.drainTo(this.addToInboxFunction);
                if (result.isDone()) {
                    assert (this.ssWriter.isEmpty()) : "input is done, but we had some entries and not the barrier";
                    this.snapshotContext.taskletDone(this.pendingSnapshotId - 1L, this.isHigherPrioritySource);
                    this.state = State.DONE;
                    this.progTracker.reset();
                }
                this.progTracker.madeProgress(result.isMadeProgress());
                if (!this.hasReachedBarrier) break;
                this.state = State.FLUSH;
                this.stateMachineStep();
                break;
            }
            case FLUSH: {
                this.progTracker.notDone();
                if (!this.ssWriter.flush()) break;
                this.progTracker.madeProgress();
                this.state = State.REACHED_BARRIER;
                break;
            }
            case REACHED_BARRIER: {
                if (this.ssWriter.hasPendingAsyncOps()) {
                    this.progTracker.notDone();
                    return;
                }
                Throwable error = this.ssWriter.getError();
                if (error != null) {
                    this.logger.severe("Error writing to snapshot map '" + this.currMapName() + "'", error);
                    this.snapshotContext.reportError(error);
                }
                this.progTracker.madeProgress();
                this.snapshotContext.snapshotDoneForTasklet(this.ssWriter.getTotalPayloadBytes(), this.ssWriter.getTotalKeys(), this.ssWriter.getTotalChunks());
                ++this.pendingSnapshotId;
                this.resetCurrentMap();
                this.hasReachedBarrier = false;
                this.state = State.DRAIN;
                this.progTracker.notDone();
                break;
            }
            default: {
                throw new JetException("Unexpected state: " + (Object)((Object)this.state));
            }
        }
    }

    String currMapName() {
        return SnapshotRepository.snapshotDataMapName(this.jobId, this.pendingSnapshotId, this.vertexName);
    }

    private void resetCurrentMap() {
        this.ssWriter.setCurrentMap(this.currMapName());
    }

    private boolean addToInbox(Object o) {
        if (o instanceof SnapshotBarrier) {
            SnapshotBarrier barrier = (SnapshotBarrier)o;
            assert (this.pendingSnapshotId == barrier.snapshotId()) : "Unexpected barrier, expected was " + this.pendingSnapshotId + ", but barrier was " + barrier.snapshotId() + ", this=" + this;
            this.hasReachedBarrier = true;
        } else if (!this.ssWriter.offer((Map.Entry)o)) {
            this.pendingEntry = (Map.Entry)o;
            return false;
        }
        return true;
    }

    public String toString() {
        return StoreSnapshotTasklet.class.getSimpleName() + '{' + this.vertexName + '}';
    }

    static enum State {
        DRAIN,
        FLUSH,
        REACHED_BARRIER,
        DONE;

    }
}

