/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.AbstractFileHandle;
import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

class FsyncFileWriteHandle
extends AbstractFileHandle
implements FileWriteHandle {
    private final RecordSerializer serializer;
    private final long maxSegmentSize;
    private final int serializerVersion = IgniteSystemProperties.getInteger("IGNITE_WAL_SERIALIZER_VERSION", 2);
    final AtomicReference<WALRecord> head = new AtomicReference();
    private volatile long written;
    private volatile long lastFsyncPos;
    private final AtomicBoolean stop = new AtomicBoolean(false);
    private final Lock lock = new ReentrantLock();
    private final Condition writeComplete = this.lock.newCondition();
    private final Condition fsync = this.lock.newCondition();
    private final Condition nextSegment = this.lock.newCondition();
    private final WALMode mode;
    private final int tlbSize;
    protected final GridCacheSharedContext cctx;
    private final DataStorageMetricsImpl metrics;
    protected final IgniteLogger log;
    private final long fsyncDelay;
    private int switchSegmentRecordOffset;
    private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>(){

        @Override
        protected ByteBuffer initialValue() {
            ByteBuffer buf = ByteBuffer.allocateDirect(FsyncFileWriteHandle.this.tlbSize);
            buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
            return buf;
        }
    };

    FsyncFileWriteHandle(GridCacheSharedContext cctx, SegmentIO fileIO, DataStorageMetricsImpl metrics, RecordSerializer serializer, long pos, WALMode mode, long maxSegmentSize, int size, long fsyncDelay) throws IOException {
        super(fileIO);
        assert (serializer != null);
        this.mode = mode;
        this.tlbSize = size;
        this.cctx = cctx;
        this.metrics = metrics;
        this.log = cctx.logger(FsyncFileWriteHandle.class);
        this.fsyncDelay = fsyncDelay;
        this.maxSegmentSize = maxSegmentSize;
        this.serializer = serializer;
        this.written = pos;
        this.lastFsyncPos = pos;
        this.head.set(new FakeRecord(new FileWALPointer(fileIO.getSegmentId(), (int)pos, 0), false));
        fileIO.position(pos);
    }

    @Override
    public int serializerVersion() {
        return this.serializer.version();
    }

    @Override
    public void finishResumeLogging() {
    }

    @Override
    public void writeHeader() throws StorageException {
        try {
            long updatedPosition;
            assert (this.fileIO.position() == 0L) : "Serializer version can be written only at the begin of file " + this.fileIO.position();
            this.written = updatedPosition = FsyncFileWriteHandle.writeSerializerVersion(this.fileIO, this.getSegmentId(), this.serializer.version(), this.mode);
            this.lastFsyncPos = updatedPosition;
            this.head.set(new FakeRecord(new FileWALPointer(this.getSegmentId(), (int)updatedPosition, 0), false));
        }
        catch (IOException e) {
            throw new StorageException("Unable to write serializer version for segment " + this.getSegmentId(), e);
        }
    }

    private static long writeSerializerVersion(FileIO io, long idx, int version, WALMode mode) throws IOException {
        ByteBuffer buf = ByteBuffer.allocate(29);
        buf.order(ByteOrder.nativeOrder());
        io.writeFully(FileWriteAheadLogManager.prepareSerializerVersionBuffer(idx, version, false, buf));
        if (mode == WALMode.FSYNC) {
            io.force();
        }
        return io.position();
    }

    private boolean stopped() {
        return this.stopped(this.head.get());
    }

    private boolean stopped(WALRecord record) {
        return record instanceof FakeRecord && ((FakeRecord)record).stop;
    }

    @Override
    @Nullable
    public WALPointer addRecord(WALRecord rec) throws StorageException {
        FileWALPointer ptr;
        assert (rec.size() > 0 || rec.getClass() == FakeRecord.class);
        boolean flushed = false;
        while (true) {
            WALRecord h;
            long nextPos;
            if ((nextPos = this.nextPosition(h = this.head.get())) + (long)rec.size() >= this.maxSegmentSize || this.stopped(h)) {
                return null;
            }
            int newChainSize = h.chainSize() + rec.size();
            if (newChainSize > this.tlbSize && !flushed) {
                boolean res;
                boolean bl = res = h.previous() == null || this.flush(h, false);
                if (rec.size() <= this.tlbSize) continue;
                flushed = res;
                continue;
            }
            rec.chainSize(newChainSize);
            rec.previous(h);
            ptr = new FileWALPointer(this.getSegmentId(), (int)nextPos, rec.size());
            rec.position(ptr);
            if (this.head.compareAndSet(h, rec)) break;
        }
        return ptr;
    }

    @Override
    public void flushAll() throws IgniteCheckedException {
        this.flush(this.head.get(), false);
    }

    public void flushAllOnStop() throws IgniteCheckedException {
        this.flush(this.head.get(), true);
    }

    private long nextPosition(WALRecord rec) {
        return FsyncFileWriteHandle.recordOffset(rec) + rec.size();
    }

    private static int recordOffset(WALRecord rec) {
        FileWALPointer ptr = (FileWALPointer)rec.position();
        assert (ptr != null);
        return ptr.fileOffset();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushOrWait(FileWALPointer ptr, boolean stop) throws StorageException {
        long expWritten;
        if (ptr != null) {
            if (ptr.index() != this.getSegmentId()) {
                return;
            }
            expWritten = ptr.fileOffset();
        } else {
            expWritten = FsyncFileWriteHandle.recordOffset(this.head.get());
        }
        if (this.flush(ptr, stop)) {
            return;
        }
        if (stop) {
            FakeRecord fr = (FakeRecord)this.head.get();
            assert (fr.stop) : "Invalid fake record on top of the queue: " + fr;
            expWritten = FsyncFileWriteHandle.recordOffset(fr);
        }
        for (int i = 0; i < 64; ++i) {
            if (this.written < expWritten) continue;
            return;
        }
        this.lock.lock();
        try {
            while (this.written < expWritten && !this.cctx.kernalContext().invalid()) {
                U.awaitQuiet(this.writeComplete);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private boolean flush(FileWALPointer ptr, boolean stop) throws StorageException {
        WALRecord h;
        if (ptr == null) {
            WALRecord expHead;
            do {
                FakeRecord frHead;
                if ((expHead = this.head.get()).previous() != null || (frHead = (FakeRecord)expHead).stop != stop && !frHead.stop && !this.head.compareAndSet(expHead, new FakeRecord(frHead.position(), stop))) continue;
                return false;
            } while (!this.flush(expHead, stop));
            return true;
        }
        assert (ptr.index() == this.getSegmentId());
        do {
            if (this.chainBeginPosition(h = this.head.get()) <= (long)ptr.fileOffset()) continue;
            return false;
        } while (!this.flush(h, stop));
        return true;
    }

    private long chainBeginPosition(WALRecord h) {
        return FsyncFileWriteHandle.recordOffset(h) + h.size() - h.chainSize();
    }

    private void checkNode() throws StorageException {
        if (this.cctx.kernalContext().invalid()) {
            throw new StorageException("Failed to perform WAL operation (environment was invalidated by a previous error)");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean flush(WALRecord expHead, boolean stop) throws StorageException {
        if (expHead.previous() == null) {
            FakeRecord frHead = (FakeRecord)expHead;
            if (!stop || frHead.stop) {
                return false;
            }
        }
        this.checkNode();
        if (!this.head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(this.getSegmentId(), (int)this.nextPosition(expHead), 0), stop))) {
            return false;
        }
        if (expHead.chainSize() == 0) {
            return false;
        }
        try {
            ByteBuffer buf;
            boolean tmpBuf = false;
            if (expHead.chainSize() > this.tlbSize) {
                buf = GridUnsafe.allocateBuffer(expHead.chainSize());
                tmpBuf = true;
            } else {
                buf = this.tlb.get();
            }
            try {
                long pos = this.fillBuffer(buf, expHead);
                this.writeBuffer(pos, buf);
            }
            finally {
                if (tmpBuf) {
                    GridUnsafe.freeBuffer(buf);
                }
            }
            return true;
        }
        catch (Throwable e) {
            StorageException se = e instanceof StorageException ? (StorageException)e : new StorageException("Unable to write", new IOException(e));
            this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, se));
            this.signalNextAvailable();
            throw se;
        }
    }

    private long fillBuffer(ByteBuffer buf, WALRecord head) throws IgniteCheckedException {
        int limit = head.chainSize();
        assert (limit <= buf.capacity());
        buf.rewind();
        buf.limit(limit);
        do {
            buf.position(head.chainSize() - head.size());
            buf.limit(head.chainSize());
            try {
                this.serializer.writeRecord(head, buf);
            }
            catch (RuntimeException e) {
                throw new IllegalStateException("Failed to write record: " + head, e);
            }
            assert (!buf.hasRemaining()) : "Reported record size is greater than actual: " + head;
        } while ((head = head.previous()).previous() != null);
        assert (head instanceof FakeRecord) : head.getClass();
        buf.rewind();
        buf.limit(limit);
        return FsyncFileWriteHandle.recordOffset(head);
    }

    @Override
    public boolean needFsync(FileWALPointer ptr) {
        return this.getSegmentId() == ptr.index() && this.lastFsyncPos <= (long)ptr.fileOffset();
    }

    @Override
    public FileWALPointer position() {
        this.lock.lock();
        try {
            FileWALPointer fileWALPointer = new FileWALPointer(this.getSegmentId(), (int)this.written, 0);
            return fileWALPointer;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException {
        this.fsync(ptr, false);
    }

    @Override
    public void closeBuffer() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteInterruptedCheckedException {
        block16: {
            this.lock.lock();
            try {
                long end;
                if (ptr != null) {
                    if (!this.needFsync(ptr)) {
                        return;
                    }
                    if (this.fsyncDelay > 0L && !this.stopped()) {
                        U.await(this.fsync, this.fsyncDelay, TimeUnit.NANOSECONDS);
                        if (!this.needFsync(ptr)) {
                            return;
                        }
                    }
                }
                this.flushOrWait(ptr, stop);
                if (this.stopped()) {
                    return;
                }
                if (this.lastFsyncPos == this.written) break block16;
                assert (this.lastFsyncPos < this.written);
                boolean metricsEnabled = this.metrics.metricsEnabled();
                long start = metricsEnabled ? System.nanoTime() : 0L;
                try {
                    this.fileIO.force();
                }
                catch (IOException e) {
                    throw new StorageException(e);
                }
                this.lastFsyncPos = this.written;
                if (this.fsyncDelay > 0L) {
                    this.fsync.signalAll();
                }
                long l = end = metricsEnabled ? System.nanoTime() : 0L;
                if (metricsEnabled) {
                    this.metrics.onFsync(end - start);
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean close(boolean rollOver) throws StorageException {
        if (this.stop.compareAndSet(false, true)) {
            this.lock.lock();
            try {
                this.flushOrWait(null, true);
                assert (this.stopped()) : "Segment is not closed after close flush: " + this.head.get();
                try {
                    try {
                        RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(this.cctx).createSerializer(this.serializerVersion);
                        SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
                        int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
                        if (rollOver && this.written + (long)switchSegmentRecSize < this.maxSegmentSize) {
                            ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
                            segmentRecord.position(new FileWALPointer(this.getSegmentId(), (int)this.written, switchSegmentRecSize));
                            backwardSerializer.writeRecord(segmentRecord, buf);
                            buf.rewind();
                            this.written += (long)this.fileIO.writeFully(buf, this.written);
                            this.switchSegmentRecordOffset = (int)this.written;
                        }
                    }
                    catch (IgniteCheckedException e) {
                        throw new IOException(e);
                    }
                    finally {
                        assert (this.mode == WALMode.FSYNC);
                        this.fileIO.force();
                        this.lastFsyncPos = this.written;
                        this.fileIO.close();
                    }
                }
                catch (IOException e) {
                    throw new StorageException("Failed to close WAL write handle [idx=" + this.getSegmentId() + "]", e);
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Closed WAL write handle [idx=" + this.getSegmentId() + "]");
                }
                boolean bl = true;
                return bl;
            }
            finally {
                this.lock.unlock();
            }
        }
        return false;
    }

    @Override
    public void signalNextAvailable() {
        this.lock.lock();
        try {
            WALRecord rec = this.head.get();
            if (!this.cctx.kernalContext().invalid()) {
                assert (rec instanceof FakeRecord) : "Expected head FakeRecord, actual head " + (rec != null ? rec.getClass().getSimpleName() : "null");
                assert (this.written == this.lastFsyncPos || this.mode != WALMode.FSYNC) : "fsync [written=" + this.written + ", lastFsync=" + this.lastFsyncPos + ']';
                this.fileIO = null;
            } else {
                try {
                    this.fileIO.close();
                }
                catch (IOException e) {
                    U.error(this.log, "Failed to close WAL file [idx=" + this.getSegmentId() + ", fileIO=" + this.fileIO + "]", e);
                }
            }
            this.nextSegment.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void awaitNext() {
        this.lock.lock();
        try {
            while (this.fileIO != null && !this.cctx.kernalContext().invalid()) {
                U.awaitQuiet(this.nextSegment);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeBuffer(long pos, ByteBuffer buf) throws StorageException {
        boolean interrupted = false;
        this.lock.lock();
        try {
            assert (this.fileIO != null) : "Writing to a closed segment.";
            this.checkNode();
            long lastLogged = U.currentTimeMillis();
            long logBackoff = 2000L;
            while (this.written != pos) {
                assert (this.written < pos) : "written = " + this.written + ", pos = " + pos;
                long now = U.currentTimeMillis();
                if (now - lastLogged >= logBackoff) {
                    if (logBackoff < 3600000L) {
                        logBackoff *= 2L;
                    }
                    U.warn(this.log, "Still waiting for a concurrent write to complete [written=" + this.written + ", pos=" + pos + ", lastFsyncPos=" + this.lastFsyncPos + ", stop=" + this.stop.get() + ", actualPos=" + this.safePosition() + ']');
                    lastLogged = now;
                }
                try {
                    this.writeComplete.await(2L, TimeUnit.SECONDS);
                }
                catch (InterruptedException ignore) {
                    interrupted = true;
                }
                this.checkNode();
            }
            int size = buf.remaining();
            assert (size > 0) : size;
            try {
                assert (this.written == this.fileIO.position());
                this.fileIO.writeFully(buf);
                this.written += (long)size;
                this.metrics.onWalBytesWritten(size);
                assert (this.written == this.fileIO.position());
            }
            catch (IOException e) {
                StorageException se = new StorageException("Unable to write", e);
                this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, se));
                throw se;
            }
        }
        finally {
            this.writeComplete.signalAll();
            this.lock.unlock();
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public String safePosition() {
        SegmentIO io = this.fileIO;
        if (io == null) {
            return "null";
        }
        try {
            return String.valueOf(io.position());
        }
        catch (IOException e) {
            return "{Failed to read channel position: " + e.getMessage() + "}";
        }
    }

    @Override
    public int getSwitchSegmentRecordOffset() {
        return this.switchSegmentRecordOffset;
    }

    static final class FakeRecord
    extends WALRecord {
        private final boolean stop;

        FakeRecord(FileWALPointer pos, boolean stop) {
            this.position(pos);
            this.stop = stop;
        }

        @Override
        public WALRecord.RecordType type() {
            return null;
        }

        @Override
        public FileWALPointer position() {
            return (FileWALPointer)super.position();
        }

        @Override
        public String toString() {
            return S.toString(FakeRecord.class, this, "super", (Object)super.toString());
        }
    }
}

