/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import com.codahale.metrics.Timer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCommitLogService {
    private Thread thread;
    private volatile boolean shutdown = false;
    protected volatile long lastSyncedAt = System.currentTimeMillis();
    private final AtomicLong written = new AtomicLong(0L);
    protected final AtomicLong pending = new AtomicLong(0L);
    protected final WaitQueue syncComplete = new WaitQueue();
    final CommitLog commitLog;
    private final String name;
    private final long pollIntervalNanos;
    private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);

    AbstractCommitLogService(CommitLog commitLog, String name, long pollIntervalMillis) {
        this.commitLog = commitLog;
        this.name = name;
        this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(pollIntervalMillis, TimeUnit.MILLISECONDS);
    }

    void start() {
        if (this.pollIntervalNanos < 1L) {
            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms", (double)this.pollIntervalNanos * 1.0E-6));
        }
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                long firstLagAt = 0L;
                long totalSyncDuration = 0L;
                long syncExceededIntervalBy = 0L;
                int lagCount = 0;
                int syncCount = 0;
                while (true) {
                    boolean shutdownRequested = AbstractCommitLogService.this.shutdown;
                    try {
                        boolean logged;
                        long syncStarted = System.nanoTime();
                        AbstractCommitLogService.this.commitLog.sync();
                        AbstractCommitLogService.this.lastSyncedAt = syncStarted;
                        AbstractCommitLogService.this.syncComplete.signalAll();
                        long now = System.nanoTime();
                        long wakeUpAt = syncStarted + AbstractCommitLogService.this.pollIntervalNanos;
                        if (wakeUpAt < now) {
                            if (firstLagAt == 0L) {
                                firstLagAt = now;
                                lagCount = 0;
                                syncCount = 0;
                                totalSyncDuration = syncExceededIntervalBy = (long)0;
                            }
                            syncExceededIntervalBy += now - wakeUpAt;
                            ++lagCount;
                        }
                        if (firstLagAt > 0L && (logged = NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 5L, TimeUnit.MINUTES, "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", ++syncCount, String.format("%.2f", (double)(now - firstLagAt) * 1.0E-9), String.format("%.2f", (double)(totalSyncDuration += now - syncStarted) * 1.0E-6 / (double)syncCount), lagCount, String.format("%.2f", (double)syncExceededIntervalBy * 1.0E-6 / (double)lagCount)))) {
                            firstLagAt = 0L;
                        }
                        if (shutdownRequested) {
                            return;
                        }
                        if (wakeUpAt <= now) continue;
                        LockSupport.parkNanos(wakeUpAt - now);
                    }
                    catch (Throwable t) {
                        if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) break;
                        LockSupport.parkNanos(AbstractCommitLogService.this.pollIntervalNanos);
                    }
                }
            }
        };
        this.shutdown = false;
        this.thread = NamedThreadFactory.createThread(runnable, this.name);
        this.thread.start();
    }

    public void finishWriteFor(CommitLogSegment.Allocation alloc) {
        this.maybeWaitForSync(alloc);
        this.written.incrementAndGet();
    }

    protected abstract void maybeWaitForSync(CommitLogSegment.Allocation var1);

    public void requestExtraSync() {
        LockSupport.unpark(this.thread);
    }

    public void shutdown() {
        this.shutdown = true;
        this.requestExtraSync();
    }

    public void syncBlocking() {
        long requestTime = System.nanoTime();
        this.requestExtraSync();
        this.awaitSyncAt(requestTime, null);
    }

    void awaitSyncAt(long syncTime, Timer.Context context) {
        do {
            WaitQueue.Signal signal;
            WaitQueue.Signal signal2 = signal = context != null ? this.syncComplete.register(context) : this.syncComplete.register();
            if (this.lastSyncedAt < syncTime) {
                signal.awaitUninterruptibly();
                continue;
            }
            signal.cancel();
        } while (this.lastSyncedAt < syncTime);
    }

    public void awaitTermination() throws InterruptedException {
        this.thread.join();
    }

    public long getCompletedTasks() {
        return this.written.get();
    }

    public long getPendingTasks() {
        return this.pending.get();
    }
}

