package org.apache.qpid.server.store.berkeleydb;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.je.Transaction;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.class */
public class CoalescingCommiter implements Committer {
    private final CommitThread _commitThread;

    /* loaded from: input_file:org/apache/qpid/server/store/berkeleydb/CoalescingCommiter$BDBCommitFutureResult.class */
    private static final class BDBCommitFutureResult<X> implements CommitThreadJob {
        private final X _value;
        private final ThreadNotifyingSettableFuture<X> _future;

        public BDBCommitFutureResult(X x, ThreadNotifyingSettableFuture<X> threadNotifyingSettableFuture) {
            this._value = x;
            this._future = threadNotifyingSettableFuture;
        }

        @Override // org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.CommitThreadJob
        public void complete() {
            this._future.set(this._value);
        }

        @Override // org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.CommitThreadJob
        public void abort(RuntimeException runtimeException) {
            this._future.setException(runtimeException);
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/store/berkeleydb/CoalescingCommiter$CommitThread.class */
    private static class CommitThread extends Thread {
        private static final Logger LOGGER = LoggerFactory.getLogger(CommitThread.class);
        private final int _jobQueueNotifyThreshold;
        private final long _commiterWaitTimeout;
        private final AtomicBoolean _stopped;
        private final Queue<CommitThreadJob> _jobQueue;
        private final Object _lock;
        private final EnvironmentFacade _environmentFacade;
        private final List<CommitThreadJob> _inProcessJobs;

        public CommitThread(String str, int i, long j, EnvironmentFacade environmentFacade) {
            super(str);
            this._stopped = new AtomicBoolean(false);
            this._jobQueue = new ConcurrentLinkedQueue();
            this._lock = new Object();
            this._inProcessJobs = new ArrayList(256);
            this._jobQueueNotifyThreshold = i;
            this._commiterWaitTimeout = j;
            this._environmentFacade = environmentFacade;
        }

        public void explicitNotify() {
            synchronized (this._lock) {
                this._lock.notifyAll();
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this._stopped.get()) {
                synchronized (this._lock) {
                    while (!this._stopped.get() && !hasJobs()) {
                        try {
                            this._lock.wait(this._commiterWaitTimeout);
                        } catch (InterruptedException e) {
                        }
                    }
                }
                processJobs();
            }
        }

        /* JADX WARN: Finally extract failed */
        private void processJobs() {
            while (true) {
                CommitThreadJob poll = this._jobQueue.poll();
                if (poll == null) {
                    break;
                } else {
                    this._inProcessJobs.add(poll);
                }
            }
            int i = 0;
            try {
                try {
                    long j = 0;
                    if (LOGGER.isDebugEnabled()) {
                        j = System.currentTimeMillis();
                    }
                    this._environmentFacade.flushLog();
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("flushLog completed in " + (System.currentTimeMillis() - j) + " ms");
                    }
                    while (i < this._inProcessJobs.size()) {
                        this._inProcessJobs.get(i).complete();
                        i++;
                    }
                    this._inProcessJobs.clear();
                } catch (RuntimeException e) {
                    try {
                        LOGGER.error("Exception during environment log flush", e);
                        while (i < this._inProcessJobs.size()) {
                            this._inProcessJobs.get(i).abort(e);
                            i++;
                        }
                        this._environmentFacade.flushLogFailed(e);
                        this._inProcessJobs.clear();
                    } catch (Throwable th) {
                        this._environmentFacade.flushLogFailed(e);
                        throw th;
                    }
                }
            } catch (Throwable th2) {
                this._inProcessJobs.clear();
                throw th2;
            }
        }

        private boolean hasJobs() {
            return !this._jobQueue.isEmpty();
        }

        public void addJob(CommitThreadJob commitThreadJob, boolean z) {
            if (this._stopped.get()) {
                throw new IllegalStateException("Commit thread is stopped");
            }
            this._jobQueue.add(commitThreadJob);
            if (z || this._jobQueue.size() >= this._jobQueueNotifyThreshold) {
                synchronized (this._lock) {
                    this._lock.notifyAll();
                }
            }
        }

        public void close() {
            synchronized (this._lock) {
                this._stopped.set(true);
                try {
                    this._environmentFacade.flushLog();
                    while (true) {
                        CommitThreadJob poll = this._jobQueue.poll();
                        if (poll == null) {
                            break;
                        } else {
                            poll.complete();
                        }
                    }
                } catch (RuntimeException e) {
                    RuntimeException runtimeException = new RuntimeException("Commit thread has been closed, transaction aborted");
                    int i = 0;
                    while (true) {
                        CommitThreadJob poll2 = this._jobQueue.poll();
                        if (poll2 == null) {
                            break;
                        }
                        i++;
                        poll2.abort(runtimeException);
                    }
                    if (LOGGER.isDebugEnabled() && i > 0) {
                        LOGGER.debug(i + " commit(s) were aborted during close.");
                    }
                }
                this._lock.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/store/berkeleydb/CoalescingCommiter$CommitThreadJob.class */
    public interface CommitThreadJob {
        void complete();

        void abort(RuntimeException runtimeException);
    }

    /* loaded from: input_file:org/apache/qpid/server/store/berkeleydb/CoalescingCommiter$SynchronousCommitThreadJob.class */
    private class SynchronousCommitThreadJob implements CommitThreadJob {
        private boolean _done;
        private RuntimeException _exception;

        private SynchronousCommitThreadJob() {
        }

        @Override // org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.CommitThreadJob
        public synchronized void complete() {
            this._done = true;
            notifyAll();
        }

        @Override // org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.CommitThreadJob
        public synchronized void abort(RuntimeException runtimeException) {
            this._done = true;
            this._exception = runtimeException;
            notifyAll();
        }

        public synchronized void awaitCompletion() {
            boolean z = false;
            while (!this._done) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    z = true;
                }
            }
            if (z) {
                Thread.currentThread().interrupt();
            }
            if (this._exception != null) {
                throw this._exception;
            }
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/store/berkeleydb/CoalescingCommiter$ThreadNotifyingSettableFuture.class */
    private final class ThreadNotifyingSettableFuture<X> extends AbstractFuture<X> {
        private ThreadNotifyingSettableFuture() {
        }

        public X get(long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException, ExecutionException {
            if (!isDone()) {
                CoalescingCommiter.this._commitThread.explicitNotify();
            }
            return (X) super.get(j, timeUnit);
        }

        public X get() throws InterruptedException, ExecutionException {
            if (!isDone()) {
                CoalescingCommiter.this._commitThread.explicitNotify();
            }
            return (X) super.get();
        }

        protected boolean set(X x) {
            return super.set(x);
        }

        protected boolean setException(Throwable th) {
            return super.setException(th);
        }

        public void addListener(Runnable runnable, Executor executor) {
            super.addListener(runnable, executor);
            CoalescingCommiter.this._commitThread.explicitNotify();
        }
    }

    public CoalescingCommiter(String str, int i, long j, EnvironmentFacade environmentFacade) {
        this._commitThread = new CommitThread("Commit-Thread-" + str, i, j, environmentFacade);
    }

    @Override // org.apache.qpid.server.store.berkeleydb.Committer
    public void start() {
        this._commitThread.start();
    }

    @Override // org.apache.qpid.server.store.berkeleydb.Committer
    public void stop() {
        this._commitThread.close();
        if (Thread.currentThread() != this._commitThread) {
            try {
                this._commitThread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Commit thread has not shutdown", e);
            }
        }
    }

    @Override // org.apache.qpid.server.store.berkeleydb.Committer
    public void commit(Transaction transaction, boolean z) {
        if (z) {
            SynchronousCommitThreadJob synchronousCommitThreadJob = new SynchronousCommitThreadJob();
            this._commitThread.addJob(synchronousCommitThreadJob, true);
            synchronousCommitThreadJob.awaitCompletion();
        }
    }

    @Override // org.apache.qpid.server.store.berkeleydb.Committer
    public <X> ListenableFuture<X> commitAsync(Transaction transaction, X x) {
        ThreadNotifyingSettableFuture threadNotifyingSettableFuture = new ThreadNotifyingSettableFuture();
        this._commitThread.addJob(new BDBCommitFutureResult(x, threadNotifyingSettableFuture), false);
        return threadNotifyingSettableFuture;
    }
}
