/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.transaction.log;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedXaddArrayQueue;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.internal.helpers.Exceptions;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.TransactionLogWriter;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.files.LogFile;
import org.neo4j.kernel.impl.transaction.log.files.LogFiles;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.tracing.AppendTransactionEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.InternalLog;
import org.neo4j.logging.InternalLogProvider;
import org.neo4j.monitoring.Panic;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.AppendIndexProvider;
import org.neo4j.storageengine.api.CommandBatch;
import org.neo4j.storageengine.api.CommandBatchToApply;
import org.neo4j.storageengine.api.TransactionIdStore;

public class TransactionLogQueue
extends LifecycleAdapter {
    private static final int CONSUMER_MAX_BATCH = 1024;
    private static final int INITIAL_CAPACITY = 128;
    private static final int FAILED_TX_MARKER = -1;
    private final LogFiles logFiles;
    private final LogRotation logRotation;
    private final TransactionIdStore transactionIdStore;
    private final Panic databasePanic;
    private final AppendIndexProvider appendIndexProvider;
    private final TransactionMetadataCache metadataCache;
    private final MpscUnboundedXaddArrayQueue<TxQueueElement> txAppendQueue;
    private final JobScheduler jobScheduler;
    private final InternalLog log;
    private TransactionWriter transactionWriter;
    private Thread logAppender;
    private volatile boolean stopped;

    public TransactionLogQueue(LogFiles logFiles, TransactionIdStore transactionIdStore, Panic databasePanic, AppendIndexProvider appendIndexProvider, TransactionMetadataCache metadataCache, JobScheduler jobScheduler, InternalLogProvider logProvider) {
        this.logFiles = logFiles;
        this.logRotation = logFiles.getLogFile().getLogRotation();
        this.transactionIdStore = transactionIdStore;
        this.databasePanic = databasePanic;
        this.appendIndexProvider = appendIndexProvider;
        this.metadataCache = metadataCache;
        this.txAppendQueue = new MpscUnboundedXaddArrayQueue(128);
        this.jobScheduler = jobScheduler;
        this.stopped = true;
        this.log = logProvider.getLog(((Object)((Object)this)).getClass());
    }

    public TxQueueElement submit(CommandBatchToApply batch, LogAppendEvent logAppendEvent) throws IOException {
        if (this.stopped) {
            throw new DatabaseShutdownException();
        }
        TxQueueElement txQueueElement = new TxQueueElement(batch, logAppendEvent);
        while (!this.txAppendQueue.offer((Object)txQueueElement)) {
            if (!this.stopped) continue;
            throw new DatabaseShutdownException();
        }
        LockSupport.unpark(this.logAppender);
        return txQueueElement;
    }

    public synchronized void start() {
        this.transactionWriter = new TransactionWriter(this.txAppendQueue, this.logFiles.getLogFile(), this.transactionIdStore, this.databasePanic, this.logRotation, this.log, this.appendIndexProvider, this.metadataCache);
        this.logAppender = this.jobScheduler.threadFactory(Group.LOG_WRITER).newThread(this.transactionWriter);
        this.logAppender.start();
        this.stopped = false;
    }

    public synchronized void shutdown() throws ExecutionException, InterruptedException {
        Thread appender;
        this.stopped = true;
        TransactionWriter writer = this.transactionWriter;
        if (writer != null) {
            writer.stop();
        }
        if ((appender = this.logAppender) != null) {
            appender.join();
        }
    }

    static class TxQueueElement {
        private static final long PARK_TIME = TimeUnit.MILLISECONDS.toNanos(100L);
        private final CommandBatchToApply batch;
        private final LogAppendEvent logAppendEvent;
        private final Thread executor;
        private Throwable throwable;
        private TxQueueElement[] elementsToNotify;
        private volatile long[] txIds;
        private volatile long txId;

        TxQueueElement(CommandBatchToApply batch, LogAppendEvent logAppendEvent) {
            this.batch = batch;
            this.logAppendEvent = logAppendEvent;
            this.executor = Thread.currentThread();
        }

        public long getCommittedTxId() {
            Throwable exception;
            while (this.txId == 0L && this.txIds == null) {
                LockSupport.parkNanos(PARK_TIME);
            }
            TxQueueElement[] elements = this.elementsToNotify;
            if (elements != null) {
                long[] ids = this.txIds;
                for (int i = 1; i < elements.length; ++i) {
                    TxQueueElement element = elements[i];
                    element.txId = ids[i];
                    LockSupport.unpark(element.executor);
                }
                this.txId = ids[0];
            }
            if ((exception = this.throwable) != null) {
                throw new RuntimeException(exception);
            }
            return this.txId;
        }

        public void fail(Throwable throwable) {
            this.throwable = throwable;
            this.txId = -1L;
            LockSupport.unpark(this.executor);
        }
    }

    private static class TransactionWriter
    implements Runnable {
        private final MpscUnboundedXaddArrayQueue<TxQueueElement> txQueue;
        private final TransactionLogWriter transactionLogWriter;
        private final LogFile logFile;
        private final Panic databasePanic;
        private final LogRotation logRotation;
        private final InternalLog log;
        private final int checksum;
        private final AppendIndexProvider appendIndexProvider;
        private final TransactionMetadataCache metadataCache;
        private volatile boolean stopped;
        private final MessagePassingQueue.WaitStrategy waitStrategy;

        TransactionWriter(MpscUnboundedXaddArrayQueue<TxQueueElement> txQueue, LogFile logFile, TransactionIdStore transactionIdStore, Panic databasePanic, LogRotation logRotation, InternalLog log, AppendIndexProvider appendIndexProvider, TransactionMetadataCache metadataCache) {
            this.txQueue = txQueue;
            this.transactionLogWriter = logFile.getTransactionLogWriter();
            this.logFile = logFile;
            this.checksum = transactionIdStore.getLastCommittedTransaction().checksum();
            this.databasePanic = databasePanic;
            this.logRotation = logRotation;
            this.appendIndexProvider = appendIndexProvider;
            this.metadataCache = metadataCache;
            this.log = log;
            this.waitStrategy = new SpinParkCombineWaitingStrategy();
        }

        @Override
        public void run() {
            TxQueueElement element;
            TxConsumer txConsumer = new TxConsumer(this.databasePanic, this.transactionLogWriter, this.checksum, this.appendIndexProvider, this.metadataCache);
            int idleCounter = 0;
            while (!this.stopped) {
                try {
                    int drainedElements = this.txQueue.drain((MessagePassingQueue.Consumer)txConsumer, 1024);
                    if (drainedElements > 0) {
                        idleCounter = 0;
                        txConsumer.processBatch();
                        LogAppendEvent logAppendEvent = txConsumer.txElements[drainedElements - 1].logAppendEvent;
                        boolean logRotated = this.logRotation.locklessRotateLogIfNeeded(logAppendEvent);
                        logAppendEvent.setLogRotated(logRotated);
                        if (!logRotated) {
                            this.logFile.locklessForce(logAppendEvent);
                        }
                        txConsumer.complete();
                        continue;
                    }
                    idleCounter = this.waitStrategy.idle(idleCounter);
                }
                catch (Throwable t) {
                    this.log.error("Transaction log applier failure.", t);
                    this.databasePanic.panic(t);
                    txConsumer.cancelBatch(t);
                }
            }
            DatabaseShutdownException databaseShutdownException = new DatabaseShutdownException();
            while ((element = (TxQueueElement)this.txQueue.poll()) != null) {
                element.fail((Throwable)databaseShutdownException);
            }
        }

        public void stop() {
            this.stopped = true;
        }

        private static class TxConsumer
        implements MessagePassingQueue.Consumer<TxQueueElement> {
            private final Panic databasePanic;
            private final TransactionLogWriter transactionLogWriter;
            private int checksum;
            private final AppendIndexProvider appendIndexProvider;
            private final TransactionMetadataCache metadataCache;
            private final TxQueueElement[] txElements = new TxQueueElement[1024];
            private int index;
            private TxQueueElement[] elements;
            private long[] txIds;

            TxConsumer(Panic databasePanic, TransactionLogWriter transactionLogWriter, int checksum, AppendIndexProvider appendIndexProvider, TransactionMetadataCache metadataCache) {
                this.databasePanic = databasePanic;
                this.transactionLogWriter = transactionLogWriter;
                this.checksum = checksum;
                this.appendIndexProvider = appendIndexProvider;
                this.metadataCache = metadataCache;
            }

            public void accept(TxQueueElement txQueueElement) {
                this.txElements[this.index++] = txQueueElement;
            }

            private void processBatch() throws IOException {
                this.databasePanic.assertNoPanic(IOException.class);
                int drainedElements = this.index;
                this.elements = new TxQueueElement[drainedElements];
                this.txIds = new long[drainedElements];
                for (int i = 0; i < drainedElements; ++i) {
                    TxQueueElement txQueueElement;
                    this.elements[i] = txQueueElement = this.txElements[i];
                    LogAppendEvent logAppendEvent = txQueueElement.logAppendEvent;
                    long lastTransactionId = 1L;
                    try (AppendTransactionEvent appendEvent = logAppendEvent.beginAppendTransaction(drainedElements);){
                        for (CommandBatchToApply commands = txQueueElement.batch; commands != null; commands = commands.next()) {
                            long transactionId = commands.transactionId();
                            this.appendToLog(commands, transactionId, logAppendEvent);
                            lastTransactionId = transactionId;
                        }
                        this.txIds[i] = lastTransactionId;
                        continue;
                    }
                    catch (Exception e) {
                        Exceptions.throwIfUnchecked((Throwable)e);
                        throw new RuntimeException(e);
                    }
                }
            }

            private void appendToLog(CommandBatchToApply commandBatchToApply, long transactionId, LogAppendEvent logAppendEvent) throws IOException {
                this.transactionLogWriter.resetAppendedBytesCounter();
                CommandBatch commandBatch = commandBatchToApply.commandBatch();
                long appendIndex = this.appendIndexProvider.nextAppendIndex();
                this.checksum = this.transactionLogWriter.append(commandBatch, transactionId, commandBatchToApply.chunkId(), appendIndex, this.checksum, commandBatchToApply.previousBatchLogPosition(), logAppendEvent);
                LogPosition logPositionBeforeCommit = this.transactionLogWriter.beforeAppendPosition();
                this.metadataCache.cacheTransactionMetadata(appendIndex, logPositionBeforeCommit);
                LogPosition logPositionAfterCommit = this.transactionLogWriter.getCurrentPosition();
                logAppendEvent.appendedBytes(this.transactionLogWriter.getAppendedBytes());
                commandBatchToApply.batchAppended(appendIndex, logPositionBeforeCommit, logPositionAfterCommit, this.checksum);
            }

            public void complete() {
                TxQueueElement first = this.txElements[0];
                first.elementsToNotify = this.elements;
                first.txIds = this.txIds;
                LockSupport.unpark(first.executor);
                Arrays.fill(this.txElements, 0, this.index, null);
                this.index = 0;
            }

            public void cancelBatch(Throwable t) {
                for (int i = 0; i < this.index; ++i) {
                    this.txElements[i].fail(t);
                }
                Arrays.fill(this.txElements, 0, this.index, null);
                this.index = 0;
            }
        }
    }

    private static class SpinParkCombineWaitingStrategy
    implements MessagePassingQueue.WaitStrategy {
        private static final int SPIN_THRESHOLD = Runtime.getRuntime().availableProcessors() < 2 ? 1 : 1000;
        private static final int SHORT_PARK_THRESHOLD = 100000;
        private static final int LONG_PARK_COUNTER = 100001;
        private static final int SHORT_PARK_TIME = 10;
        private static final long LONG_PARK_TIME = TimeUnit.MILLISECONDS.toNanos(10L);

        private SpinParkCombineWaitingStrategy() {
        }

        public int idle(int idleCounter) {
            if (idleCounter < SPIN_THRESHOLD) {
                Thread.onSpinWait();
            } else if (idleCounter < 100000) {
                LockSupport.parkNanos(10L);
            } else {
                LockSupport.parkNanos(LONG_PARK_TIME);
                return 100001;
            }
            return idleCounter + 1;
        }
    }
}

