package org.apache.hadoop.hbase.regionserver.wal;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.SingleThreadEventExecutor;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.htrace.NullScope;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.LimitedPrivate({"Configuration"})
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.class */
public class AsyncFSWAL extends AbstractFSWAL<WALProvider.AsyncWriter> {
    private static final Log LOG;
    private static final Comparator<SyncFuture> SEQ_COMPARATOR;
    public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
    public static final long DEFAULT_WAL_BATCH_SIZE = 65536;
    public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
    public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
    private final EventLoop eventLoop;
    private final Class<? extends Channel> channelClass;
    private final Lock consumeLock;
    private final Runnable consumer;
    private final Supplier<Boolean> hasConsumerTask;
    private volatile boolean waitingRoll;
    private boolean readyForRolling;
    private final Condition readyForRollingCond;
    private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
    private final Sequence waitingConsumePayloadsGatingSequence;
    private final AtomicBoolean consumerScheduled;
    private volatile boolean writerBroken;
    private final long batchSize;
    private final int createMaxRetries;
    private final ExecutorService closeExecutor;
    private volatile AsyncFSOutput fsOut;
    private final Deque<FSWALEntry> toWriteAppends;
    private final Deque<FSWALEntry> unackedAppends;
    private final SortedSet<SyncFuture> syncFutures;
    private long highestProcessedAppendTxid;
    private long fileLengthAtLastSync;
    private long highestProcessedAppendTxidAtLastSync;
    static final /* synthetic */ boolean $assertionsDisabled;

    public AsyncFSWAL(FileSystem fileSystem, Path path, String str, String str2, Configuration configuration, List<WALActionsListener> list, boolean z, String str3, String str4, EventLoop eventLoop, Class<? extends Channel> cls) throws FailedLogCloseException, IOException {
        super(fileSystem, path, str, str2, configuration, list, z, str3, str4);
        Supplier<Boolean> supplier;
        this.consumeLock = new ReentrantLock();
        this.consumer = this::consume;
        this.readyForRollingCond = this.consumeLock.newCondition();
        this.consumerScheduled = new AtomicBoolean(false);
        this.closeExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
        this.toWriteAppends = new ArrayDeque();
        this.unackedAppends = new ArrayDeque();
        this.syncFutures = new TreeSet(SEQ_COMPARATOR);
        this.eventLoop = eventLoop;
        this.channelClass = cls;
        if (eventLoop instanceof SingleThreadEventExecutor) {
            try {
                Field declaredField = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
                declaredField.setAccessible(true);
                Queue queue = (Queue) declaredField.get(eventLoop);
                supplier = () -> {
                    return Boolean.valueOf(queue.peek() == this.consumer);
                };
            } catch (Exception e) {
                LOG.warn("Can not get task queue of " + eventLoop + ", this is not necessary, just give up", e);
                supplier = () -> {
                    return false;
                };
            }
        } else {
            supplier = () -> {
                return false;
            };
        }
        this.hasConsumerTask = supplier;
        this.waitingConsumePayloads = RingBuffer.createMultiProducer(RingBufferTruck::new, this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", HFile.DEFAULT_BYTES_PER_CHECKSUM));
        this.waitingConsumePayloadsGatingSequence = new Sequence(-1L);
        this.waitingConsumePayloads.addGatingSequences(new Sequence[]{this.waitingConsumePayloadsGatingSequence});
        this.waitingConsumePayloads.publish(this.waitingConsumePayloads.next());
        this.waitingConsumePayloadsGatingSequence.set(this.waitingConsumePayloads.getCursor());
        this.batchSize = configuration.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
        this.createMaxRetries = configuration.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, 10);
        rollWriter();
    }

    private boolean trySetReadyForRolling() {
        if (!this.waitingRoll || !this.unackedAppends.isEmpty()) {
            return false;
        }
        this.consumeLock.lock();
        try {
            if (!this.waitingRoll) {
                return false;
            }
            this.readyForRolling = true;
            this.readyForRollingCond.signalAll();
            return true;
        } finally {
            this.consumeLock.unlock();
        }
    }

    private void syncFailed(Throwable th) {
        LOG.warn("sync failed", th);
        this.consumeLock.lock();
        try {
            if (this.writerBroken) {
                return;
            }
            this.writerBroken = true;
            if (this.waitingRoll) {
                this.readyForRolling = true;
                this.readyForRollingCond.signalAll();
            }
            Iterator<FSWALEntry> descendingIterator = this.unackedAppends.descendingIterator();
            while (descendingIterator.hasNext()) {
                this.toWriteAppends.addFirst(descendingIterator.next());
            }
            this.highestUnsyncedTxid = this.highestSyncedTxid.get();
            requestLogRoll();
        } finally {
            this.consumeLock.unlock();
        }
    }

    private void syncCompleted(WALProvider.AsyncWriter asyncWriter, long j, long j2) {
        this.highestSyncedTxid.set(j);
        Iterator<FSWALEntry> it = this.unackedAppends.iterator();
        while (it.hasNext() && it.next().getTxid() <= j) {
            it.remove();
        }
        postSync(System.nanoTime() - j2, finishSync(true));
        if (!trySetReadyForRolling() && asyncWriter.getLength() >= this.logrollsize && this.rollWriterLock.tryLock()) {
            try {
                requestLogRoll();
                this.rollWriterLock.unlock();
            } catch (Throwable th) {
                this.rollWriterLock.unlock();
                throw th;
            }
        }
    }

    private void sync(WALProvider.AsyncWriter asyncWriter) {
        this.fileLengthAtLastSync = asyncWriter.getLength();
        long j = this.highestProcessedAppendTxid;
        this.highestProcessedAppendTxidAtLastSync = j;
        long nanoTime = System.nanoTime();
        asyncWriter.sync().whenComplete((l, th) -> {
            if (th != null) {
                syncFailed(th);
            } else {
                syncCompleted(asyncWriter, j, nanoTime);
            }
        });
    }

    private void addTimeAnnotation(SyncFuture syncFuture, String str) {
        TraceScope continueSpan = Trace.continueSpan(syncFuture.getSpan());
        Trace.addTimelineAnnotation(str);
        syncFuture.setSpan(continueSpan.detach());
    }

    private int finishSyncLowerThanTxid(long j, boolean z) {
        int i = 0;
        Iterator<SyncFuture> it = this.syncFutures.iterator();
        while (it.hasNext()) {
            SyncFuture next = it.next();
            if (next.getTxid() > j) {
                break;
            }
            next.done(j, null);
            it.remove();
            i++;
            if (z) {
                addTimeAnnotation(next, "writer synced");
            }
        }
        return i;
    }

    private int finishSync(boolean z) {
        if (!this.unackedAppends.isEmpty()) {
            long max = Math.max(this.unackedAppends.peek().getTxid() - 1, this.highestSyncedTxid.get());
            this.highestSyncedTxid.set(max);
            return finishSyncLowerThanTxid(max, z);
        }
        if (!this.toWriteAppends.isEmpty()) {
            long txid = this.toWriteAppends.peek().getTxid();
            if (!$assertionsDisabled && txid <= this.highestProcessedAppendTxid) {
                throw new AssertionError();
            }
            long j = txid - 1;
            this.highestSyncedTxid.set(j);
            return finishSyncLowerThanTxid(j, z);
        }
        long j2 = this.highestSyncedTxid.get();
        for (SyncFuture syncFuture : this.syncFutures) {
            j2 = Math.max(j2, syncFuture.getTxid());
            syncFuture.done(j2, null);
            if (z) {
                addTimeAnnotation(syncFuture, "writer synced");
            }
        }
        this.highestSyncedTxid.set(j2);
        int size = this.syncFutures.size();
        this.syncFutures.clear();
        return size;
    }

    private void appendAndSync() {
        boolean append;
        WALProvider.AsyncWriter asyncWriter = (WALProvider.AsyncWriter) this.writer;
        finishSync(false);
        long j = -1;
        Iterator<FSWALEntry> it = this.toWriteAppends.iterator();
        while (it.hasNext()) {
            FSWALEntry next = it.next();
            Span detachSpan = next.detachSpan();
            if (detachSpan != null) {
                TraceScope continueSpan = Trace.continueSpan(detachSpan);
                try {
                    try {
                        append = append(asyncWriter, next);
                        if (!$assertionsDisabled && continueSpan != NullScope.INSTANCE && continueSpan.isDetached()) {
                            throw new AssertionError();
                        }
                        continueSpan.close();
                    } catch (IOException e) {
                        throw new AssertionError("should not happen", e);
                    }
                } catch (Throwable th) {
                    if (!$assertionsDisabled && continueSpan != NullScope.INSTANCE && continueSpan.isDetached()) {
                        throw new AssertionError();
                    }
                    continueSpan.close();
                    throw th;
                }
            } else {
                try {
                    append = append(asyncWriter, next);
                } catch (IOException e2) {
                    throw new AssertionError("should not happen", e2);
                }
            }
            j = next.getTxid();
            it.remove();
            if (append) {
                this.unackedAppends.addLast(next);
                if (asyncWriter.getLength() - this.fileLengthAtLastSync >= this.batchSize) {
                    break;
                }
            }
        }
        if (j > 0) {
            this.highestProcessedAppendTxid = j;
        } else {
            long j2 = this.highestProcessedAppendTxid;
        }
        if (asyncWriter.getLength() - this.fileLengthAtLastSync >= this.batchSize) {
            sync(asyncWriter);
            return;
        }
        if (asyncWriter.getLength() != this.fileLengthAtLastSync) {
            if (this.syncFutures.isEmpty() || this.syncFutures.last().getTxid() <= this.highestProcessedAppendTxidAtLastSync) {
                return;
            }
            sync(asyncWriter);
            return;
        }
        if (this.unackedAppends.isEmpty()) {
            this.highestSyncedTxid.set(this.highestProcessedAppendTxid);
            finishSync(false);
            trySetReadyForRolling();
        }
    }

    private void consume() {
        this.consumeLock.lock();
        try {
            if (this.writerBroken) {
                return;
            }
            if (this.waitingRoll) {
                if (((WALProvider.AsyncWriter) this.writer).getLength() > this.fileLengthAtLastSync) {
                    sync((WALProvider.AsyncWriter) this.writer);
                } else if (this.unackedAppends.isEmpty()) {
                    this.readyForRolling = true;
                    this.readyForRollingCond.signalAll();
                }
                return;
            }
            long cursor = this.waitingConsumePayloads.getCursor();
            for (long j = this.waitingConsumePayloadsGatingSequence.get() + 1; j <= cursor && this.waitingConsumePayloads.isPublished(j); j++) {
                RingBufferTruck ringBufferTruck = (RingBufferTruck) this.waitingConsumePayloads.get(j);
                switch (ringBufferTruck.type()) {
                    case APPEND:
                        this.toWriteAppends.addLast(ringBufferTruck.unloadAppend());
                        break;
                    case SYNC:
                        this.syncFutures.add(ringBufferTruck.unloadSync());
                        break;
                    default:
                        LOG.warn("RingBufferTruck with unexpected type: " + ringBufferTruck.type());
                        break;
                }
                this.waitingConsumePayloadsGatingSequence.set(j);
            }
            appendAndSync();
            if (this.hasConsumerTask.get().booleanValue()) {
                return;
            }
            if (this.toWriteAppends.isEmpty() && this.waitingConsumePayloadsGatingSequence.get() == this.waitingConsumePayloads.getCursor()) {
                this.consumerScheduled.set(false);
                if (this.waitingConsumePayloadsGatingSequence.get() == this.waitingConsumePayloads.getCursor() || !this.consumerScheduled.compareAndSet(false, true)) {
                    return;
                }
            }
            this.eventLoop.execute(this.consumer);
        } finally {
            this.consumeLock.unlock();
        }
    }

    private boolean shouldScheduleConsumer() {
        if (this.writerBroken || this.waitingRoll) {
            return false;
        }
        return this.consumerScheduled.compareAndSet(false, true);
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL, org.apache.hadoop.hbase.wal.WAL
    public long append(RegionInfo regionInfo, WALKey wALKey, WALEdit wALEdit, boolean z) throws IOException {
        long stampSequenceIdAndPublishToRingBuffer = stampSequenceIdAndPublishToRingBuffer(regionInfo, wALKey, wALEdit, z, this.waitingConsumePayloads);
        if (shouldScheduleConsumer()) {
            this.eventLoop.execute(this.consumer);
        }
        return stampSequenceIdAndPublishToRingBuffer;
    }

    @Override // org.apache.hadoop.hbase.wal.WAL
    public void sync() throws IOException {
        TraceScope startSpan = Trace.startSpan("AsyncFSWAL.sync");
        try {
            long next = this.waitingConsumePayloads.next();
            try {
                SyncFuture syncFuture = getSyncFuture(next, startSpan.detach());
                ((RingBufferTruck) this.waitingConsumePayloads.get(next)).load(syncFuture);
                this.waitingConsumePayloads.publish(next);
                if (shouldScheduleConsumer()) {
                    this.eventLoop.execute(this.consumer);
                }
                TraceScope continueSpan = Trace.continueSpan(blockOnSync(syncFuture));
                if (!$assertionsDisabled && continueSpan != NullScope.INSTANCE && continueSpan.isDetached()) {
                    throw new AssertionError();
                }
                continueSpan.close();
            } catch (Throwable th) {
                this.waitingConsumePayloads.publish(next);
                throw th;
            }
        } catch (Throwable th2) {
            if (!$assertionsDisabled && startSpan != NullScope.INSTANCE && startSpan.isDetached()) {
                throw new AssertionError();
            }
            startSpan.close();
            throw th2;
        }
    }

    @Override // org.apache.hadoop.hbase.wal.WAL
    public void sync(long j) throws IOException {
        if (this.highestSyncedTxid.get() >= j) {
            return;
        }
        TraceScope startSpan = Trace.startSpan("AsyncFSWAL.sync");
        try {
            long next = this.waitingConsumePayloads.next();
            try {
                SyncFuture syncFuture = getSyncFuture(j, startSpan.detach());
                ((RingBufferTruck) this.waitingConsumePayloads.get(next)).load(syncFuture);
                this.waitingConsumePayloads.publish(next);
                if (shouldScheduleConsumer()) {
                    this.eventLoop.execute(this.consumer);
                }
                TraceScope continueSpan = Trace.continueSpan(blockOnSync(syncFuture));
                if (!$assertionsDisabled && continueSpan != NullScope.INSTANCE && continueSpan.isDetached()) {
                    throw new AssertionError();
                }
                continueSpan.close();
            } catch (Throwable th) {
                this.waitingConsumePayloads.publish(next);
                throw th;
            }
        } catch (Throwable th2) {
            if (!$assertionsDisabled && startSpan != NullScope.INSTANCE && startSpan.isDetached()) {
                throw new AssertionError();
            }
            startSpan.close();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    public WALProvider.AsyncWriter createWriterInstance(Path path) throws IOException {
        boolean z = false;
        int i = 0;
        while (true) {
            try {
                return AsyncFSWALProvider.createAsyncWriter(this.conf, this.fs, path, z, this.eventLoop, this.channelClass);
            } catch (FanOutOneBlockAsyncDFSOutputHelper.NameNodeException e) {
                throw e;
            } catch (IOException e2) {
                LOG.warn("create wal log writer " + path + " failed, retry = " + i, e2);
                if (i >= this.createMaxRetries) {
                    break;
                }
                z = true;
                try {
                    Thread.sleep(ConnectionUtils.getPauseTime(100L, i));
                } catch (InterruptedException e3) {
                    throw new InterruptedIOException();
                }
            } catch (RemoteException e4) {
                LOG.warn("create wal log writer " + path + " failed, retry = " + i, e4);
                if (!FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate(e4)) {
                    IOException unwrapRemoteException = e4.unwrapRemoteException();
                    if (e4.getMessage().contains("Parent directory doesn't exist:")) {
                        this.syncFutures.forEach(syncFuture -> {
                            syncFuture.done(syncFuture.getTxid(), unwrapRemoteException);
                        });
                    }
                    throw unwrapRemoteException;
                }
                if (i >= this.createMaxRetries) {
                    break;
                }
            }
            i++;
        }
        throw new IOException("Failed to create wal log writer " + path + " after retrying " + this.createMaxRetries + " time(s)");
    }

    private void waitForSafePoint() {
        this.consumeLock.lock();
        try {
            if (this.writerBroken || this.writer == 0) {
                return;
            }
            this.consumerScheduled.set(true);
            this.waitingRoll = true;
            this.readyForRolling = false;
            this.eventLoop.execute(this.consumer);
            while (!this.readyForRolling) {
                this.readyForRollingCond.awaitUninterruptibly();
            }
        } finally {
            this.consumeLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    public long doReplaceWriter(Path path, Path path2, WALProvider.AsyncWriter asyncWriter) throws IOException {
        long j;
        waitForSafePoint();
        WALProvider.AsyncWriter asyncWriter2 = (WALProvider.AsyncWriter) this.writer;
        this.writer = asyncWriter;
        if (asyncWriter != null && (asyncWriter instanceof AsyncProtobufLogWriter)) {
            this.fsOut = ((AsyncProtobufLogWriter) asyncWriter).getOutput();
        }
        this.fileLengthAtLastSync = 0L;
        this.highestProcessedAppendTxidAtLastSync = 0L;
        this.consumeLock.lock();
        try {
            this.consumerScheduled.set(true);
            this.waitingRoll = false;
            this.writerBroken = false;
            this.eventLoop.execute(this.consumer);
            this.consumeLock.unlock();
            if (asyncWriter2 != null) {
                j = asyncWriter2.getLength();
                this.closeExecutor.execute(() -> {
                    try {
                        asyncWriter2.close();
                    } catch (IOException e) {
                        LOG.warn("close old writer failed", e);
                    }
                });
            } else {
                j = 0;
            }
            return j;
        } catch (Throwable th) {
            this.consumeLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    protected void doShutdown() throws IOException {
        waitForSafePoint();
        if (this.writer != 0) {
            ((WALProvider.AsyncWriter) this.writer).close();
            this.writer = null;
        }
        this.closeExecutor.shutdown();
        IOException iOException = new IOException("WAL has been closed");
        this.syncFutures.forEach(syncFuture -> {
            syncFuture.done(syncFuture.getTxid(), iOException);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    public void doAppend(WALProvider.AsyncWriter asyncWriter, FSWALEntry fSWALEntry) {
        asyncWriter.append(fSWALEntry);
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    DatanodeInfo[] getPipeline() {
        AsyncFSOutput asyncFSOutput = this.fsOut;
        return asyncFSOutput != null ? asyncFSOutput.getPipeline() : new DatanodeInfo[0];
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    int getLogReplication() {
        return getPipeline().length;
    }

    static {
        $assertionsDisabled = !AsyncFSWAL.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(AsyncFSWAL.class);
        SEQ_COMPARATOR = (syncFuture, syncFuture2) -> {
            int compare = Long.compare(syncFuture.getTxid(), syncFuture2.getTxid());
            return compare != 0 ? compare : Integer.compare(System.identityHashCode(syncFuture), System.identityHashCode(syncFuture2));
        };
    }
}
