/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core.state;

import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;
import org.neo4j.causalclustering.SessionTracker;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.monitoring.RaftLogCommitIndexMonitor;
import org.neo4j.causalclustering.core.replication.DistributedOperation;
import org.neo4j.causalclustering.core.replication.ProgressTracker;
import org.neo4j.causalclustering.core.state.CommandBatcher;
import org.neo4j.causalclustering.core.state.CommandDispatcher;
import org.neo4j.causalclustering.core.state.CoreState;
import org.neo4j.causalclustering.core.state.InFlightLogEntryReader;
import org.neo4j.causalclustering.core.state.Result;
import org.neo4j.causalclustering.core.state.machines.tx.CoreReplicatedContent;
import org.neo4j.causalclustering.core.state.snapshot.CoreSnapshot;
import org.neo4j.causalclustering.helper.StatUtil;
import org.neo4j.function.ThrowingAction;
import org.neo4j.function.ThrowingBiConsumer;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class CommandApplicationProcess {
    private static final long NOTHING = -1L;
    private final RaftLog raftLog;
    private final int flushEvery;
    private final ProgressTracker progressTracker;
    private final SessionTracker sessionTracker;
    private final Supplier<DatabaseHealth> dbHealth;
    private final InFlightCache inFlightCache;
    private final Log log;
    private final CoreState coreState;
    private final RaftLogCommitIndexMonitor commitIndexMonitor;
    private final CommandBatcher batcher;
    private final StatUtil.StatContext batchStat;
    private long lastFlushed = -1L;
    private int pauseCount = 1;
    private Thread applierThread;
    private final ApplierState applierState = new ApplierState();

    public CommandApplicationProcess(RaftLog raftLog, int maxBatchSize, int flushEvery, Supplier<DatabaseHealth> dbHealth, LogProvider logProvider, ProgressTracker progressTracker, SessionTracker sessionTracker, CoreState coreState, InFlightCache inFlightCache, Monitors monitors) {
        this.raftLog = raftLog;
        this.flushEvery = flushEvery;
        this.progressTracker = progressTracker;
        this.sessionTracker = sessionTracker;
        this.log = logProvider.getLog(this.getClass());
        this.dbHealth = dbHealth;
        this.coreState = coreState;
        this.inFlightCache = inFlightCache;
        this.commitIndexMonitor = (RaftLogCommitIndexMonitor)monitors.newMonitor(RaftLogCommitIndexMonitor.class, new String[]{this.getClass().getName()});
        this.batcher = new CommandBatcher(maxBatchSize, (ThrowingBiConsumer<Long, List<DistributedOperation>, Exception>)((ThrowingBiConsumer)this::applyBatch));
        this.batchStat = StatUtil.create("BatchSize", this.log, 4096L, true);
    }

    void notifyCommitted(long commitIndex) {
        this.applierState.notifyCommitted(commitIndex);
    }

    private void applyJob() {
        while (this.applierState.keepRunning) {
            try {
                this.applyUpTo(this.applierState.awaitJob());
            }
            catch (Throwable e) {
                this.applierState.panic();
                this.log.error("Failed to apply", e);
                this.dbHealth.get().panic(e);
                return;
            }
        }
    }

    private void applyUpTo(long applyUpToIndex) throws Exception {
        try (InFlightLogEntryReader logEntrySupplier = new InFlightLogEntryReader(this.raftLog, this.inFlightCache, true);){
            for (long logIndex = this.applierState.lastApplied + 1L; this.applierState.keepRunning && logIndex <= applyUpToIndex; ++logIndex) {
                RaftLogEntry entry = logEntrySupplier.get(logIndex);
                if (entry == null) {
                    throw new IllegalStateException(String.format("Committed log entry at index %d must exist.", logIndex));
                }
                if (entry.content() instanceof DistributedOperation) {
                    DistributedOperation distributedOperation = (DistributedOperation)entry.content();
                    this.progressTracker.trackReplication(distributedOperation);
                    this.batcher.add(logIndex, distributedOperation);
                    continue;
                }
                this.batcher.flush();
                this.applierState.lastApplied = logIndex;
            }
            this.batcher.flush();
        }
    }

    public long lastApplied() {
        return this.applierState.lastApplied;
    }

    void installSnapshot(CoreSnapshot coreSnapshot) {
        assert (this.pauseCount > 0);
        this.lastFlushed = coreSnapshot.prevIndex();
        this.applierState.lastApplied = this.lastFlushed;
    }

    synchronized long lastFlushed() {
        return this.lastFlushed;
    }

    private void applyBatch(long lastIndex, List<DistributedOperation> batch) throws Exception {
        if (batch.size() == 0) {
            return;
        }
        this.batchStat.collect(batch.size());
        long startIndex = lastIndex - (long)batch.size() + 1L;
        long lastHandledIndex = this.handleOperations(startIndex, batch);
        assert (lastHandledIndex == lastIndex);
        this.applierState.lastApplied = lastIndex;
        this.maybeFlushToDisk();
    }

    private long handleOperations(long commandIndex, List<DistributedOperation> operations) {
        try (CommandDispatcher dispatcher = this.coreState.commandDispatcher();){
            for (DistributedOperation operation : operations) {
                if (!this.sessionTracker.validateOperation(operation.globalSession(), operation.operationId())) {
                    this.sessionTracker.validateOperation(operation.globalSession(), operation.operationId());
                    ++commandIndex;
                    continue;
                }
                CoreReplicatedContent command = (CoreReplicatedContent)operation.content();
                command.dispatch(dispatcher, commandIndex, result -> this.progressTracker.trackResult(operation, (Result)result));
                this.sessionTracker.update(operation.globalSession(), operation.operationId(), commandIndex);
                ++commandIndex;
            }
        }
        return commandIndex - 1L;
    }

    private void maybeFlushToDisk() throws IOException {
        if (this.applierState.lastApplied - this.lastFlushed > (long)this.flushEvery) {
            this.coreState.flush(this.applierState.lastApplied);
            this.lastFlushed = this.applierState.lastApplied;
        }
    }

    public synchronized void start() throws Exception {
        if (this.lastFlushed == -1L) {
            this.lastFlushed = this.coreState.getLastFlushed();
        }
        this.applierState.lastApplied = this.lastFlushed;
        this.log.info(String.format("Restoring last applied index to %d", this.lastFlushed));
        this.sessionTracker.start();
        long lastPossiblyApplying = Math.max(this.coreState.getLastAppliedIndex(), this.applierState.getLastSeenCommitIndex());
        if (lastPossiblyApplying > this.applierState.lastApplied) {
            this.log.info("Applying up to: " + lastPossiblyApplying);
            this.applyUpTo(lastPossiblyApplying);
        }
        this.resumeApplier("startup");
    }

    public synchronized void stop() throws IOException {
        this.pauseApplier("shutdown");
        this.coreState.flush(this.applierState.lastApplied);
    }

    private void spawnApplierThread() {
        this.applierState.setKeepRunning(true);
        this.applierThread = new Thread(this::applyJob, "core-state-applier");
        this.applierThread.start();
    }

    private void stopApplierThread() {
        this.applierState.setKeepRunning(false);
        this.ignoringInterrupts((ThrowingAction<InterruptedException>)((ThrowingAction)() -> this.applierThread.join()));
    }

    public synchronized void pauseApplier(String reason) {
        if (this.pauseCount < 0) {
            throw new IllegalStateException("Unmatched pause/resume");
        }
        ++this.pauseCount;
        this.log.info(String.format("Pausing due to %s (count = %d)", reason, this.pauseCount));
        if (this.pauseCount == 1) {
            this.stopApplierThread();
        }
    }

    public synchronized void resumeApplier(String reason) {
        if (this.pauseCount <= 0) {
            throw new IllegalStateException("Unmatched pause/resume");
        }
        --this.pauseCount;
        this.log.info(String.format("Resuming after %s (count = %d)", reason, this.pauseCount));
        if (this.pauseCount == 0) {
            this.spawnApplierThread();
        }
    }

    private void ignoringInterrupts(ThrowingAction<InterruptedException> action) {
        try {
            action.apply();
        }
        catch (InterruptedException e) {
            this.log.warn("Unexpected interrupt", (Throwable)e);
        }
    }

    private class ApplierState {
        private long lastSeenCommitIndex = -1L;
        private volatile long lastApplied = -1L;
        private volatile boolean panic;
        private volatile boolean keepRunning = true;

        private ApplierState() {
        }

        private synchronized long getLastSeenCommitIndex() {
            return this.lastSeenCommitIndex;
        }

        void panic() {
            this.panic = true;
            this.keepRunning = false;
        }

        synchronized void setKeepRunning(boolean keepRunning) {
            if (this.panic) {
                throw new IllegalStateException("The applier has panicked");
            }
            this.keepRunning = keepRunning;
            this.notifyAll();
        }

        synchronized long awaitJob() {
            while (this.lastApplied >= this.lastSeenCommitIndex && this.keepRunning) {
                CommandApplicationProcess.this.ignoringInterrupts((ThrowingAction<InterruptedException>)this::wait);
            }
            return this.lastSeenCommitIndex;
        }

        synchronized void notifyCommitted(long commitIndex) {
            if (this.lastSeenCommitIndex < commitIndex) {
                this.lastSeenCommitIndex = commitIndex;
                CommandApplicationProcess.this.commitIndexMonitor.commitIndex(commitIndex);
                this.notifyAll();
            }
        }
    }
}

