/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.dr.store.fs;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.IgniteUuidCache;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.thread.IgniteThread;
import org.gridgain.grid.dr.store.DrAbstractSenderStore;
import org.gridgain.grid.dr.store.DrSenderStoreCorruptedException;
import org.gridgain.grid.dr.store.DrSenderStoreCursor;
import org.gridgain.grid.dr.store.DrSenderStoreEntry;
import org.gridgain.grid.dr.store.DrSenderStoreOverflowMode;
import org.gridgain.grid.dr.store.DurableStore;
import org.jetbrains.annotations.Nullable;

@DurableStore
public class DrSenderFsStore
extends DrAbstractSenderStore
implements LifecycleAware {
    public static final int DFLT_MAX_FILES_CNT = 10;
    public static final int DFLT_MAX_FILE_SIZE = 0x6400000;
    public static final int DFLT_READ_BUF_SIZE = 524288;
    public static final long DFLT_CHECKPOINT_FREQ = 500L;
    public static final boolean DFLT_CHECKSUM_ENABLED = true;
    public static final boolean DFLT_SYNCHRONOUS_MODE = false;
    private static final short CHECK_PNT_MAGIC = -4422;
    private static final String CHECK_PNT_FILE_EXTENSION = ".chk";
    public static final String DFLT_PARENT_DIR_NAME = "dr_fs_store";
    @Deprecated
    public static final String DFLT_GLOBAL_STORE_DIR_NAME = "global";
    private static final int MAX_CHECKPOINT_FILES = 5;
    private final DataCenterStream[] streamById = new DataCenterStream[256];
    private long maxFileSize = 0x6400000L;
    private int readBufSize = 524288;
    private int maxFilesNum = 10;
    private boolean checksum = true;
    private long checkPntFreq = 500L;
    private GridCacheDatabaseSharedManager.FileLockHolder dirLock;
    private static final int WAIT_LOCK_TIMEOUT_MS = 2000;
    private String dirPath;
    private Path dir;
    private boolean syncMode = false;
    private final GridConcurrentSkipListSet<LogFile> files = new GridConcurrentSkipListSet();
    private final Set<Cursor> cursors = new GridConcurrentHashSet();
    @LoggerResource
    private IgniteLogger log;
    @IgniteInstanceResource
    private IgniteEx ignite;
    private volatile LogFile head;
    private CheckPointWorker checkPntWorker;
    private ConcurrentLinkedQueue<ByteBuffer> pool = new ConcurrentLinkedQueue();
    private volatile boolean updatedAfterCheckPnt;
    private volatile boolean ackAfterCheckPnt;
    private final Lock lock = new ReentrantLock();
    private final ArrayDeque<Path> checkPoints = new ArrayDeque();
    private final AtomicLong fileIdxGen = new AtomicLong(System.currentTimeMillis());

    public long getMaxFileSize() {
        return this.maxFileSize;
    }

    public DrSenderFsStore setMaxFileSize(long maxFileSize) {
        this.maxFileSize = maxFileSize;
        return this;
    }

    public int getMaxFilesCount() {
        return this.maxFilesNum;
    }

    public DrSenderFsStore setMaxFilesCount(int maxFilesNum) {
        this.maxFilesNum = maxFilesNum;
        return this;
    }

    public boolean isSyncronousWrites() {
        return this.syncMode;
    }

    public DrSenderFsStore setSynchronousWrites(boolean syncMode) {
        this.syncMode = syncMode;
        return this;
    }

    public long getCheckpointFrequency() {
        return this.checkPntFreq;
    }

    public DrSenderFsStore setCheckpointFrequency(long checkPntFreq) {
        this.checkPntFreq = checkPntFreq;
        return this;
    }

    public String getDirectoryPath() {
        return this.dirPath;
    }

    public DrSenderFsStore setDirectoryPath(String dirPath) {
        this.dirPath = dirPath;
        return this;
    }

    @Override
    public DrSenderFsStore setOverflowMode(DrSenderStoreOverflowMode overflowMode) {
        super.setOverflowMode(overflowMode);
        return this;
    }

    public boolean isChecksumEnabled() {
        return this.checksum;
    }

    public DrSenderFsStore setChecksumEnabled(boolean checksum) {
        this.checksum = checksum;
        return this;
    }

    public int getReadBufferSize() {
        return this.readBufSize;
    }

    public DrSenderFsStore setReadBufferSize(int readBufSize) {
        this.readBufSize = readBufSize;
        return this;
    }

    public void start() throws IgniteException {
        try {
            this.start0();
        }
        catch (IgniteException e) {
            if (this.dirLock != null) {
                U.closeQuiet((AutoCloseable)this.dirLock);
            }
            throw e;
        }
    }

    private void start0() {
        A.ensure((this.readBufSize > 64 ? 1 : 0) != 0, (String)"readBufSize > 64");
        A.notNull((Object)((Object)this.overflowMode), (String)"overflowMode");
        A.ensure((this.maxFileSize > 128L ? 1 : 0) != 0, (String)"maxFileSize > 128");
        A.ensure((this.maxFilesNum > 2 ? 1 : 0) != 0, (String)"maxFilesNum > 2");
        A.ensure((this.checkPntFreq > 50L ? 1 : 0) != 0, (String)"checkPntFreq > 50");
        A.notNull((Object)this.dirPath, (String)"dirPath");
        File dirFile = new File(this.dirPath);
        if (dirFile.isAbsolute()) {
            this.dir = dirFile.toPath();
        } else if (U.getIgniteHome() != null) {
            this.dir = new File(U.getIgniteHome(), this.dirPath).toPath();
        } else {
            throw new IgniteException("Cannot resolve path: " + this.dirPath);
        }
        Collection<Object> checkPoints = Collections.emptyList();
        try {
            boolean dirExisted = Files.exists(this.dir, new LinkOption[0]);
            if (!dirExisted) {
                Files.createDirectories(this.dir, new FileAttribute[0]);
            }
            this.dirLock = new GridCacheDatabaseSharedManager.FileLockHolder(this.dir.toString(), this.ignite.context(), this.log);
            try {
                this.dirLock.tryLock(2000L);
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException("Failed to acquire lock in directory: [dir=" + this.dir + ", localNodeId=" + this.ignite.context().localNodeId() + "]. Check no other sender store on this host is configured to use that dir. If there are no such sender and sender was terminated forcibly, then you may want to delete 'lock' file in that dir.", (Throwable)e);
            }
            if (dirExisted) {
                this.initLogFiles();
                checkPoints = this.initLastCheckPoint();
            }
        }
        catch (IOException e) {
            throw new IgniteException((Throwable)e);
        }
        try {
            this.switchToNextFile(this.files.isEmpty() ? 0L : ((LogFile)this.files.last()).id + 1L);
        }
        catch (IgniteCheckedException e) {
            throw U.convertException((IgniteCheckedException)e);
        }
        this.checkPoints.addAll(checkPoints);
        if (!this.syncMode) {
            this.checkPntWorker = new CheckPointWorker(this.ignite.name(), this.log);
            new IgniteThread((GridWorker)this.checkPntWorker).start();
        }
        if (this.log.isInfoEnabled()) {
            this.log.info("DR sender FS store is started in directory " + this.dir);
        }
    }

    @Override
    public long sizeBytes() {
        ArrayList<Cursor> cursors = new ArrayList<Cursor>();
        this.lock.lock();
        try {
            for (DataCenterStream dataCenterStream : this.streams()) {
                cursors.add(new Cursor(dataCenterStream));
            }
            LogPos start = null;
            for (Cursor cursor : cursors) {
                LogPos curStartPos = cursor.pos;
                if (cursor.next() == null || start != null && curStartPos.compareTo(start) >= 0) continue;
                start = curStartPos;
            }
            if (start == null) {
                long l = 0L;
                return l;
            }
            long l = this.files.tailSet((Object)start.file, true).stream().mapToLong(LogFile::size).sum() - start.off;
            return l;
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException("Failed to calculate store size", (Throwable)e);
        }
        finally {
            cursors.forEach(IgniteUtils::closeQuiet);
            this.lock.unlock();
        }
    }

    private TreeSet<FileWithId> files(final String fileExt) throws IOException {
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(this.dir, (DirectoryStream.Filter<? super Path>)new DirectoryStream.Filter<Path>(){

            @Override
            public boolean accept(Path entry) {
                return entry.toString().endsWith(fileExt);
            }
        });){
            TreeSet<FileWithId> res = new TreeSet<FileWithId>();
            for (Path f : dirStream) {
                res.add(new FileWithId(f));
            }
            TreeSet<FileWithId> treeSet = res;
            return treeSet;
        }
    }

    private LogFile fileById(long id) {
        assert (id >= 0L) : id;
        LogFile fake = new LogFile(id);
        LogFile res = (LogFile)this.files.ceiling((Object)fake);
        if (res == null || res.id != id) {
            res = fake;
        }
        return res;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized Collection<Path> initLastCheckPoint() throws IOException {
        TreeSet<FileWithId> checkPoints = this.files(CHECK_PNT_FILE_EXTENSION);
        Iterator<FileWithId> iter = checkPoints.descendingIterator();
        while (iter.hasNext()) {
            FileWithId f = iter.next();
            try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(f.file, new OpenOption[0])));){
                DrSenderFsStore.checkMagic(in);
                int streamsNum = in.readInt();
                DataCenterStream[] arr = new DataCenterStream[streamsNum];
                for (int i = 0; i < streamsNum; ++i) {
                    byte id = in.readByte();
                    long fileId = in.readLong();
                    LogFile logFile = this.fileById(fileId);
                    long filePos = in.readLong();
                    LogPos p = new LogPos(logFile, filePos);
                    long redoLogSize = in.readLong();
                    DrSenderFsStore.checkMagic(in);
                    DataCenterStream stream = new DataCenterStream(id, p);
                    stream.position(p);
                    stream.redoLogSize.set(redoLogSize);
                    arr[i] = stream;
                }
                for (DataCenterStream stream : arr) {
                    if (this.streamById[stream.id & 0xFF] != null) {
                        throw new IOException("Duplicate data center id.");
                    }
                    this.streamById[((DataCenterStream)stream).id & 0xFF] = stream;
                }
                DataCenterStream[] dataCenterStreamArray = this.streamById;
                synchronized (this.streamById) {
                    // ** MonitorExit[var8_11] (shouldn't be in output)
                    if (!this.log.isInfoEnabled()) break;
                    this.log.info("DR Store initialized from: " + f.file);
                    break;
                }
            }
            catch (IOException ex) {
                if (this.log.isInfoEnabled()) {
                    this.log.info("Checkpoint file is corrupted:[id=" + f.id + ", msg=" + ex.getMessage() + "]");
                }
                iter.remove();
                Files.delete(f.file);
            }
        }
        {
            if (checkPoints.isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList<Path> res = new ArrayList<Path>(checkPoints.size());
            for (FileWithId f : checkPoints) {
                res.add(f.file);
            }
            return res;
        }
    }

    private static void checkMagic(ObjectInputStream in) throws IOException {
        if (in.readShort() != -4422) {
            throw new IOException("Wrong magic.");
        }
    }

    private void initLogFiles() throws IOException {
        TreeSet<FileWithId> logs = this.files(".blg");
        if (logs.isEmpty()) {
            return;
        }
        for (FileWithId f : logs) {
            this.files.add((Object)new LogFile(f.id, f.file));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Collection<DataCenterStream> streams() {
        DataCenterStream[] dataCenterStreamArray = this.streamById;
        synchronized (this.streamById) {
            // ** MonitorExit[var1_1] (shouldn't be in output)
            ArrayList<DataCenterStream> res = new ArrayList<DataCenterStream>();
            for (DataCenterStream s : this.streamById) {
                if (s == null) continue;
                res.add(s);
            }
            return res;
        }
    }

    private void stopCheckpointWorker() {
        if (this.checkPntWorker != null && !this.checkPntWorker.isCancelled()) {
            this.lock.lock();
            try {
                U.cancel((GridWorker)this.checkPntWorker);
            }
            finally {
                this.lock.unlock();
            }
            U.join((GridWorker)this.checkPntWorker, (IgniteLogger)this.log);
        }
    }

    public void stop() {
        try {
            this.stopCheckpointWorker();
            this.flush();
            assert (!this.updatedAfterCheckPnt);
            this.terminate();
        }
        finally {
            if (this.dirLock != null) {
                U.closeQuiet((AutoCloseable)this.dirLock);
            }
        }
    }

    public void terminate() {
        this.stopCheckpointWorker();
        for (LogFile f : this.files) {
            f.stopReads();
            try {
                f.stopWrites();
            }
            catch (IgniteCheckedException e) {
                U.error((IgniteLogger)this.log, (Object)("Failed to stop file operations: " + f), (Throwable)e);
            }
        }
    }

    @Override
    public void clear0() throws IgniteCheckedException {
        LogFile f;
        LogFile newFile = this.switchToNextFile(this.head.id + 1L);
        if (newFile == null) {
            newFile = (LogFile)this.files.last();
        }
        Iterator iterator = this.files.iterator();
        while (iterator.hasNext() && (f = (LogFile)iterator.next()).id < newFile.id) {
            this.delete(f);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DataCenterStream stream(byte id, boolean create) {
        int i = id & 0xFF;
        DataCenterStream ds = this.streamById[i];
        if (ds == null) {
            DrSenderFsStore drSenderFsStore = this;
            synchronized (drSenderFsStore) {
                ds = this.streamById[i];
                if (ds == null && create) {
                    this.streamById[i] = ds = new DataCenterStream(id, this.headPosition());
                }
            }
        }
        return ds;
    }

    private LogPos headPosition() {
        LogFile f = this.head;
        return new LogPos(f, f.size());
    }

    private boolean delete(LogFile file) throws IgniteCheckedException {
        if (this.files.remove((Object)file)) {
            file.delete();
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private LogFile switchToNextFile(long id) throws IgniteCheckedException {
        this.lock.lock();
        try {
            LogFile newFile = new LogFile(id);
            if (!this.files.add((Object)newFile)) {
                LogFile logFile = null;
                return logFile;
            }
            if (this.head != null && newFile.id < this.head.id) {
                this.files.remove((Object)newFile);
                LogFile logFile = null;
                return logFile;
            }
            try {
                newFile.init();
            }
            catch (IOException ex) {
                U.error((IgniteLogger)this.log, (Object)("Failed to init file: " + newFile), (Throwable)ex);
                this.files.remove((Object)newFile);
                try {
                    newFile.delete();
                }
                catch (IgniteCheckedException igniteCheckedException) {
                    // empty catch block
                }
                throw new IgniteCheckedException((Throwable)ex);
            }
            this.head = newFile;
            LogFile logFile = newFile;
            return logFile;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void store0(byte[] dataCenters, byte[] data, int cnt, @Nullable IgniteUuid fstId) throws IgniteCheckedException {
        LogFile f;
        assert (!F.isEmpty((byte[])dataCenters));
        EntryIn e = fstId == null ? new EntryIn(dataCenters, data) : new EntryInWithStateTransfer(dataCenters, data, fstId);
        int entrySize = e.size();
        for (byte dsId : dataCenters) {
            this.stream(dsId, true);
        }
        while (!(f = this.head).write(e)) {
            if (this.files.size() >= this.maxFilesNum) {
                this.deleteOldFiles();
            }
            if (this.switchToNextFile(f.id + 1L) != null) continue;
            ((LogFile)this.files.last()).awaitInitialized();
        }
        for (byte dsId : dataCenters) {
            this.stream(dsId, false).incrementSize(entrySize);
        }
        if (this.syncMode) {
            this.lock.lock();
            try {
                this.flush();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deleteOldFiles() throws IgniteCheckedException {
        this.lock.lock();
        try {
            if (this.files.size() >= this.maxFilesNum) {
                LogFile rmv = (LogFile)this.files.firstx();
                LogPos p0 = new LogPos(rmv, Long.MAX_VALUE);
                for (DataCenterStream s : this.streams()) {
                    LogPos p1 = s.position();
                    if (p0.compareTo(p1) <= 0) continue;
                    this.onOverflow();
                }
                this.delete(rmv);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public int filesNum() {
        return this.files.size();
    }

    @Override
    public DrSenderStoreCursor cursor0(byte dataCenterId) throws IgniteCheckedException {
        return this.stream(dataCenterId, true).cursor();
    }

    public long redoBytesSize(byte dataCenterId) throws IgniteCheckedException {
        DataCenterStream ds = this.stream(dataCenterId, false);
        if (ds == null) {
            throw new IgniteCheckedException("Failed to find data center with id: " + dataCenterId);
        }
        return ds.size();
    }

    public long totalBytes() {
        long totalSize = 0L;
        for (LogFile f : this.files) {
            totalSize += f.size();
        }
        return totalSize;
    }

    private ByteBuffer buffer() {
        ByteBuffer buf = this.pool.poll();
        if (buf != null) {
            return buf;
        }
        buf = ByteBuffer.allocateDirect(this.readBufSize);
        DrSenderFsStore.reset(buf);
        return buf;
    }

    private void release(ByteBuffer buf) {
        DrSenderFsStore.reset(buf);
        this.pool.offer(buf);
    }

    private static void reset(ByteBuffer buf) {
        buf.position(0);
        buf.limit(0);
    }

    private static int get(Future<Integer> fut) throws IgniteCheckedException {
        try {
            return fut.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IgniteInterruptedCheckedException(e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof ClosedChannelException) {
                throw new NoDataException(e);
            }
            throw new IgniteCheckedException((Throwable)e);
        }
    }

    public String toString() {
        return S.toString(DrSenderFsStore.class, (Object)this, (String)super.toString());
    }

    private void flush() {
        Path file = null;
        try {
            if (!this.updatedAfterCheckPnt && !this.ackAfterCheckPnt) {
                return;
            }
            this.updatedAfterCheckPnt = false;
            this.ackAfterCheckPnt = false;
            long idx = this.fileIdxGen.incrementAndGet();
            file = this.dir.resolve(idx + CHECK_PNT_FILE_EXTENSION);
            this.head.fsync();
            LogPos min = null;
            try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(Files.newOutputStream(file, StandardOpenOption.CREATE_NEW, StandardOpenOption.DSYNC)));){
                Collection<DataCenterStream> streams = this.streams();
                out.writeShort(-4422);
                out.writeInt(streams.size());
                for (DataCenterStream s : streams) {
                    out.writeByte(s.id);
                    LogPos p = s.position();
                    if (min == null || min.compareTo(p) > 0) {
                        min = p;
                    }
                    out.writeLong(p.file.id);
                    out.writeLong(p.off);
                    out.writeLong(s.size());
                    out.writeShort(-4422);
                }
            }
            this.checkPoints.addLast(file);
            for (LogFile logFile : this.files.headSet((Object)min.file, false)) {
                try {
                    this.delete(logFile);
                }
                catch (IgniteCheckedException e) {
                    U.error((IgniteLogger)this.log, (Object)("Failed to delete file: " + file), (Throwable)e);
                }
            }
            while (this.checkPoints.size() > 5) {
                Files.deleteIfExists(this.checkPoints.pollFirst());
            }
        }
        catch (Exception e) {
            U.error((IgniteLogger)this.log, (Object)("Failed to create checkpoint: " + file), (Throwable)e);
        }
    }

    public long getActiveEntriesCount() {
        long cnt = 0L;
        for (Cursor c : this.cursors) {
            cnt += (long)c.active.size();
        }
        return cnt;
    }

    private static class NoDataException
    extends IgniteCheckedException {
        private static final long serialVersionUID = 0L;

        private NoDataException() {
            super("No data.");
        }

        private NoDataException(Throwable cause) {
            super(cause);
        }
    }

    private static class EntryOut
    implements DrSenderStoreEntry,
    Comparable<EntryOut> {
        private byte[] data;
        private final Cursor cursor;
        private final LogPos pos;
        private final int len;
        private final IgniteUuid fstId;
        private volatile boolean acked;

        EntryOut(Cursor cursor, ByteBuffer[] bufs, LogPos pos, int len, IgniteUuid fstId) {
            this.cursor = cursor;
            this.data = U.readByteArray((ByteBuffer[])bufs);
            this.pos = pos;
            this.len = len;
            this.fstId = fstId;
        }

        @Override
        public byte[] data() {
            return this.data;
        }

        @Override
        public void acknowledge(byte dcId) {
            assert (!this.acked);
            this.acked = true;
            this.cursor.acknowledge(this);
        }

        @Override
        public IgniteUuid stateTransferId() {
            return this.fstId;
        }

        @Override
        public int compareTo(EntryOut o) {
            return this.pos.compareTo(o.pos);
        }

        public boolean equals(Object obj) {
            return obj instanceof EntryOut && this.compareTo((EntryOut)obj) == 0;
        }
    }

    private class EntryInWithStateTransfer
    extends EntryIn {
        static final short MAGIC = 2991;
        private final IgniteUuid fstId;

        private EntryInWithStateTransfer(byte[] streams, byte[] data, IgniteUuid fstId) {
            super(streams, data);
            assert (fstId != null);
            this.fstId = fstId;
        }

        @Override
        ByteBuffer[] toBytes() {
            return new ByteBuffer[]{ByteBuffer.wrap(this.hdr), ByteBuffer.wrap(U.igniteUuidToBytes((IgniteUuid)this.fstId)), ByteBuffer.wrap(this.streams), ByteBuffer.wrap(this.data)};
        }

        @Override
        int size() {
            return super.size() + 24;
        }

        @Override
        protected short magic() {
            return 2991;
        }
    }

    private class EntryIn {
        static final int HEADER_SIZE = 11;
        static final short MAGIC = 2990;
        protected final byte[] hdr;
        protected final byte[] data;
        protected final byte[] streams;
        private final int size;

        private EntryIn(byte[] streams, byte[] data) {
            this.data = data;
            this.streams = streams;
            this.hdr = new byte[11];
            this.size = this.hdr.length + streams.length + data.length;
            ByteBuffer hdrBuf = ByteBuffer.wrap(this.hdr);
            hdrBuf.putShort(this.magic());
            hdrBuf.put((byte)streams.length);
            hdrBuf.putInt(DrSenderFsStore.this.checksum ? Arrays.hashCode(data) : 0);
            hdrBuf.putInt(data.length);
        }

        protected short magic() {
            return 2990;
        }

        ByteBuffer[] toBytes() {
            return new ByteBuffer[]{ByteBuffer.wrap(this.hdr), ByteBuffer.wrap(this.streams), ByteBuffer.wrap(this.data)};
        }

        int size() {
            return this.size;
        }

        int streamsNumber() {
            return this.streams.length;
        }
    }

    private class LogFile
    implements Comparable<LogFile> {
        static final String EXTENSION = ".blg";
        private final long id;
        private final AtomicLong acquiredSize;
        private volatile long size;
        private Path file;
        @GridToStringExclude
        private FileChannel writeCh;
        @GridToStringExclude
        private AsynchronousFileChannel readCh;
        private final CountDownLatch init;

        LogFile(long id) {
            this.id = id;
            this.acquiredSize = new AtomicLong();
            this.init = new CountDownLatch(1);
        }

        LogFile(long id, Path file) throws IOException {
            this.id = id;
            this.file = file;
            this.init = null;
            this.acquiredSize = null;
            this.readCh = AsynchronousFileChannel.open(file, StandardOpenOption.READ);
            this.size = this.readCh.size();
        }

        public void init() throws IOException {
            try {
                this.file = DrSenderFsStore.this.dir.resolve(this.id + EXTENSION);
                this.writeCh = FileChannel.open(this.file, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
                this.readCh = AsynchronousFileChannel.open(this.file, StandardOpenOption.READ);
            }
            finally {
                assert (this.init.getCount() == 1L);
                this.init.countDown();
            }
        }

        public String toString() {
            return S.toString(LogFile.class, (Object)this);
        }

        @Override
        public int compareTo(LogFile o) {
            return Long.compare(this.id, o.id);
        }

        public boolean equals(Object obj) {
            return obj instanceof LogFile && this.compareTo((LogFile)obj) == 0;
        }

        long size() {
            return this.size;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean write(EntryIn e) throws IgniteCheckedException {
            long curSize;
            long s;
            int entrySize = e.size();
            do {
                if ((s = this.acquiredSize.get()) < DrSenderFsStore.this.maxFileSize) continue;
                return false;
            } while (!this.acquiredSize.compareAndSet(s, s + (long)entrySize));
            ByteBuffer[] bufs = e.toBytes();
            LogFile logFile = this;
            synchronized (logFile) {
                if (!this.isWritable()) {
                    return false;
                }
                try {
                    curSize = this.size;
                    assert (this.writeCh.position() == curSize);
                    long written = 0L;
                    while ((written += this.writeCh.write(bufs)) < (long)entrySize) {
                    }
                }
                catch (IOException ex) {
                    U.close((AutoCloseable)this.writeCh, (IgniteLogger)DrSenderFsStore.this.log);
                    throw new IgniteCheckedException((Throwable)ex);
                }
                this.size = curSize + (long)entrySize;
            }
            DrSenderFsStore.this.updatedAfterCheckPnt = true;
            if (curSize + (long)entrySize >= DrSenderFsStore.this.maxFileSize) {
                this.stopWrites();
            }
            return true;
        }

        public synchronized void stopWrites() throws IgniteCheckedException {
            if (!this.isWritable()) {
                return;
            }
            try {
                this.writeCh.force(false);
            }
            catch (IOException e) {
                U.warn((IgniteLogger)DrSenderFsStore.this.log, (Object)("Failed to fsync channel: " + this.id), (Throwable)e);
            }
            U.close((AutoCloseable)this.writeCh, (IgniteLogger)DrSenderFsStore.this.log);
        }

        public synchronized void delete() throws IgniteCheckedException {
            U.close((AutoCloseable)this.readCh, (IgniteLogger)DrSenderFsStore.this.log);
            U.close((AutoCloseable)this.writeCh, (IgniteLogger)DrSenderFsStore.this.log);
            try {
                Files.deleteIfExists(this.file);
            }
            catch (IOException e) {
                throw new IgniteCheckedException((Throwable)e);
            }
        }

        public void awaitInitialized() throws IgniteInterruptedCheckedException {
            assert (this.init != null);
            U.await((CountDownLatch)this.init);
        }

        public synchronized void stopReads() {
            U.close((AutoCloseable)this.readCh, (IgniteLogger)DrSenderFsStore.this.log);
        }

        public Future<Integer> read(long off, ByteBuffer buf) {
            return this.readCh.read(buf, off);
        }

        public boolean isWritable() {
            return this.writeCh != null && this.writeCh.isOpen();
        }

        public boolean exists() {
            return this.readCh != null && this.readCh.isOpen();
        }

        public synchronized void fsync() throws IOException {
            if (this.isWritable()) {
                this.writeCh.force(true);
            }
        }
    }

    private static class FileWithId
    implements Comparable<FileWithId> {
        private final long id;
        private final Path file;

        private FileWithId(Path file) {
            this.file = file;
            String name = file.toFile().getName();
            int dot = name.lastIndexOf(46);
            assert (dot != -1);
            String idStr = name.substring(0, dot);
            this.id = Long.parseLong(idStr);
        }

        @Override
        public int compareTo(FileWithId o) {
            return Long.compare(this.id, o.id);
        }

        public boolean equals(Object obj) {
            return obj instanceof FileWithId && this.compareTo((FileWithId)obj) == 0;
        }
    }

    private static class LogPos
    implements Comparable<LogPos> {
        private final LogFile file;
        private final long off;

        private LogPos(@Nullable LogFile file, long off) {
            this.file = file;
            this.off = off;
        }

        @Override
        public int compareTo(LogPos o) {
            if (this.file == null) {
                return o.file == null ? 0 : -1;
            }
            if (o.file == null) {
                return 1;
            }
            int res = this.file.compareTo(o.file);
            if (res != 0) {
                return res;
            }
            return Long.compare(this.off, o.off);
        }

        public boolean equals(Object obj) {
            return obj instanceof LogPos && this.compareTo((LogPos)obj) == 0;
        }

        int read(ByteBuffer buf, int off) throws IgniteCheckedException {
            if (!this.file.exists()) {
                throw new NoDataException();
            }
            return DrSenderFsStore.get(this.file.read(this.off + (long)off, buf));
        }

        void readMore(ByteBuffer buf, int off, int end) throws IgniteCheckedException {
            buf.compact();
            while (off < end) {
                int res0 = this.read(buf, off);
                if (res0 == -1) {
                    if (!this.file.exists()) {
                        throw new NoDataException();
                    }
                    throw new IllegalStateException();
                }
                off += res0;
            }
            buf.flip();
        }

        public String toString() {
            return S.toString(LogPos.class, (Object)this);
        }
    }

    class Cursor
    implements DrSenderStoreCursor {
        private final DataCenterStream stream;
        private final Collection<EntryOut> active = new GridConcurrentSkipListSet();
        private final AtomicReference<LogPos> activeEof = new AtomicReference();
        private final ArrayDeque<ByteBuffer> bufs = new ArrayDeque();
        private LogPos pos;

        Cursor(DataCenterStream stream) {
            this.stream = stream;
            this.pos = stream.position();
            this.nextBuffer();
        }

        private boolean switchToNextFile(ByteBuffer buf) {
            LogFile next = (LogFile)DrSenderFsStore.this.files.higher((Object)this.pos.file);
            if (next == null || next.id > DrSenderFsStore.this.head.id) {
                return false;
            }
            this.pos = new LogPos(next, 0L);
            DrSenderFsStore.reset(buf);
            return true;
        }

        @Override
        @Nullable
        public DrSenderStoreEntry next() throws IgniteCheckedException {
            while (this.bufs.size() > 1) {
                DrSenderFsStore.this.release(this.bufs.pollFirst());
            }
            ByteBuffer buf = this.bufs.peekLast();
            if (buf == null) {
                throw new IgniteCheckedException("Cursor is closed.");
            }
            try {
                ByteBuffer[] bufs;
                IgniteUuid fstId;
                int sizeOnDisk;
                int hash;
                LogPos curPos;
                block26: {
                    int dataSize;
                    block24: {
                        int fstIdSize;
                        int streamsNum;
                        block25: {
                            while (true) {
                                short magic;
                                curPos = this.pos;
                                if (buf.remaining() < 11) {
                                    if (curPos.file.size() < curPos.off + 11L) {
                                        if (curPos.file.isWritable()) {
                                            return null;
                                        }
                                        if (curPos.file.size() < curPos.off + 11L) {
                                            if (this.switchToNextFile(buf)) continue;
                                            return null;
                                        }
                                    }
                                    curPos.readMore(buf, buf.remaining(), 11);
                                }
                                if ((magic = buf.getShort()) != 2990 && magic != 2991) {
                                    throw new DrSenderStoreCorruptedException("Magic mismatch. Actual=" + Integer.toHexString(magic & 0xFFFF));
                                }
                                boolean fst = magic == 2991;
                                streamsNum = buf.get() & 0xFF;
                                hash = buf.getInt();
                                dataSize = buf.getInt();
                                fstIdSize = fst ? 24 : 0;
                                int fullHdrSize = 11 + streamsNum + fstIdSize;
                                sizeOnDisk = fullHdrSize + dataSize;
                                if (buf.remaining() < streamsNum) {
                                    if (curPos.file.size() < curPos.off + (long)fullHdrSize) {
                                        if (curPos.file.isWritable()) {
                                            DrSenderFsStore.reset(buf);
                                            return null;
                                        }
                                        if (curPos.file.size() < curPos.off + (long)fullHdrSize) {
                                            if (this.switchToNextFile(buf)) continue;
                                            return null;
                                        }
                                    }
                                    curPos.readMore(buf, 11 + buf.remaining(), fullHdrSize);
                                }
                                fstId = null;
                                if (fst) {
                                    fstId = new IgniteUuid(IgniteUuidCache.onIgniteUuidRead((UUID)new UUID(buf.getLong(), buf.getLong())), buf.getLong());
                                }
                                boolean found = false;
                                for (int i = 1; i <= streamsNum; ++i) {
                                    if (this.stream.id != buf.get()) continue;
                                    found = true;
                                    buf.position(buf.position() - i + streamsNum);
                                    break;
                                }
                                if (!found) {
                                    if (buf.remaining() >= dataSize + 11) {
                                        buf.position(buf.position() + dataSize);
                                    } else {
                                        DrSenderFsStore.reset(buf);
                                    }
                                    this.pos = new LogPos(this.pos.file, this.pos.off + (long)sizeOnDisk);
                                    continue;
                                }
                                if (buf.remaining() >= dataSize) break block24;
                                if (curPos.file.size() >= curPos.off + (long)sizeOnDisk) break block25;
                                if (curPos.file.isWritable()) {
                                    DrSenderFsStore.reset(buf);
                                    return null;
                                }
                                if (curPos.file.size() >= curPos.off + (long)sizeOnDisk) break block25;
                                if (!this.switchToNextFile(buf)) break;
                            }
                            return null;
                        }
                        int bufsNum = (dataSize - buf.remaining()) / DrSenderFsStore.this.readBufSize + 2;
                        bufs = new ByteBuffer[bufsNum];
                        bufs[0] = buf.asReadOnlyBuffer();
                        int off = 11 + fstIdSize + streamsNum + buf.remaining();
                        int last = bufsNum - 1;
                        for (int i = 1; i <= last; ++i) {
                            ByteBuffer next = this.nextBuffer();
                            int remaining = i == last ? dataSize - DrSenderFsStore.this.readBufSize * (bufsNum - 2) - buf.remaining() : DrSenderFsStore.this.readBufSize;
                            curPos.readMore(next, off, off + remaining);
                            off += remaining;
                            bufs[i] = next.asReadOnlyBuffer();
                            if (i != last) continue;
                            next.position(remaining);
                            bufs[i].limit(remaining);
                        }
                        break block26;
                    }
                    int end = buf.position() + dataSize;
                    ByteBuffer buf0 = buf.asReadOnlyBuffer();
                    buf0.limit(end);
                    buf.position(end);
                    bufs = new ByteBuffer[]{buf0};
                }
                if (DrSenderFsStore.this.checksum && U.hashCode((ByteBuffer[])bufs) != hash) {
                    throw new DrSenderStoreCorruptedException("Checksum mismatch.");
                }
                this.pos = new LogPos(this.pos.file, this.pos.off + (long)sizeOnDisk);
                EntryOut e = new EntryOut(this, bufs, curPos, sizeOnDisk, fstId);
                this.active.add(e);
                LogPos newMaxActivePos = new LogPos(e.pos.file, e.pos.off + (long)e.len);
                this.setMaxActivePosIfGreater(newMaxActivePos);
                return e;
            }
            catch (IgniteCheckedException e) {
                DrSenderFsStore.reset(buf);
                this.pos = DrSenderFsStore.this.headPosition();
                this.stream.position(this.pos);
                if (e instanceof NoDataException) {
                    if (DrSenderFsStore.this.log.isDebugEnabled()) {
                        DrSenderFsStore.this.log.debug("Store was cleared, oldest file was removed.");
                    }
                    return this.next();
                }
                throw e;
            }
        }

        private void setMaxActivePosIfGreater(LogPos newMaxActivePos) {
            LogPos prevMaxActivePos;
            while (!((prevMaxActivePos = this.activeEof.get()) != null && newMaxActivePos.compareTo(prevMaxActivePos) <= 0 || this.activeEof.compareAndSet(prevMaxActivePos, newMaxActivePos))) {
            }
        }

        private ByteBuffer nextBuffer() {
            ByteBuffer buf = DrSenderFsStore.this.buffer();
            this.bufs.add(buf);
            return buf;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void acknowledge(EntryOut entry) {
            LogPos activeEof0 = this.activeEof.get();
            EntryOut e0 = null;
            boolean entryRmvd = false;
            try {
                Iterator<EntryOut> iterator = this.active.iterator();
                while (iterator.hasNext()) {
                    EntryOut e;
                    e0 = e = iterator.next();
                    if (e.acked) {
                        if (!this.active.remove(e)) continue;
                        if (e == entry) {
                            entryRmvd = true;
                            continue;
                        }
                        return;
                    }
                    this.stream.position(e.pos);
                    return;
                }
            }
            finally {
                DrSenderFsStore.this.ackAfterCheckPnt = true;
                if (!entryRmvd) {
                    this.active.remove(entry);
                }
                assert (!this.active.contains(entry));
            }
            if (e0 != null) {
                this.stream.position(activeEof0);
            }
        }

        @Override
        public void close() {
            this.active.clear();
            for (ByteBuffer buf : this.bufs) {
                DrSenderFsStore.this.release(buf);
            }
            DrSenderFsStore.this.cursors.remove(this);
        }
    }

    private class DataCenterStream {
        private final byte id;
        private final AtomicLong redoLogSize = new AtomicLong();
        private final AtomicReference<LogPos> pos;

        private DataCenterStream(byte id, LogPos p) {
            assert (p != null);
            this.id = id;
            this.pos = new AtomicReference<LogPos>(p);
        }

        long size() {
            return this.redoLogSize.longValue();
        }

        public void incrementSize(int size) {
            this.redoLogSize.getAndAdd(size);
        }

        private void position(LogPos p) {
            LogPos old;
            while (p.compareTo(old = this.pos.get()) > 0 && !this.pos.compareAndSet(old, p)) {
            }
        }

        public LogPos position() {
            return this.pos.get();
        }

        public DrSenderStoreCursor cursor() {
            Cursor cursor = new Cursor(this);
            boolean added = DrSenderFsStore.this.cursors.add(cursor);
            assert (added);
            return cursor;
        }
    }

    private class CheckPointWorker
    extends GridWorker {
        protected CheckPointWorker(String igniteInstanceName, IgniteLogger log) {
            super(igniteInstanceName, "dr-store-checkpoint", log);
        }

        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            while (true) {
                U.sleep((long)DrSenderFsStore.this.checkPntFreq);
                DrSenderFsStore.this.lock.lock();
                try {
                    if (this.isCancelled()) break;
                    DrSenderFsStore.this.flush();
                    continue;
                }
                finally {
                    DrSenderFsStore.this.lock.unlock();
                    continue;
                }
                break;
            }
        }
    }
}

