/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgressImpl;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.jsr166.ConcurrentLinkedHashMap;

public class WriteCheckpointPages
implements Runnable {
    private final CheckpointMetricsTracker tracker;
    private final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds;
    private final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores;
    private final CountDownFuture doneFut;
    private final int totalPagesToWrite;
    private final Runnable beforePageWrite;
    private final IgniteCacheSnapshotManager snapshotMgr;
    private final IgniteLogger log;
    private final DataStorageMetricsImpl persStoreMetrics;
    private final ThreadLocal<ByteBuffer> threadBuf;
    private final PageMemoryImpl.ThrottlingPolicy throttlingPolicy;
    private final IgniteThrowableFunction<Integer, PageMemoryEx> pageMemoryGroupResolver;
    private final CheckpointProgressImpl curCpProgress;
    private final FilePageStoreManager storeMgr;
    private final BooleanSupplier shutdownNow;

    WriteCheckpointPages(CheckpointMetricsTracker tracker, GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds, ConcurrentLinkedHashMap<PageStore, LongAdder> updStores, CountDownFuture doneFut, int totalPagesToWrite, Runnable beforePageWrite, IgniteCacheSnapshotManager snapshotManager, IgniteLogger log, DataStorageMetricsImpl dsMetrics, ThreadLocal<ByteBuffer> buf, PageMemoryImpl.ThrottlingPolicy throttlingPolicy, IgniteThrowableFunction<Integer, PageMemoryEx> pageMemoryGroupResolver, CheckpointProgressImpl progress, FilePageStoreManager storeMgr, BooleanSupplier shutdownNow) {
        this.tracker = tracker;
        this.writePageIds = writePageIds;
        this.updStores = updStores;
        this.doneFut = doneFut;
        this.totalPagesToWrite = totalPagesToWrite;
        this.beforePageWrite = beforePageWrite;
        this.snapshotMgr = snapshotManager;
        this.log = log;
        this.persStoreMetrics = dsMetrics;
        this.threadBuf = buf;
        this.throttlingPolicy = throttlingPolicy;
        this.pageMemoryGroupResolver = pageMemoryGroupResolver;
        this.curCpProgress = progress;
        this.storeMgr = storeMgr;
        this.shutdownNow = shutdownNow;
    }

    @Override
    public void run() {
        this.snapshotMgr.beforeCheckpointPageWritten();
        GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds = this.writePageIds;
        try {
            GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> pagesToRetry = this.writePages(writePageIds);
            if (pagesToRetry.isEmpty()) {
                this.doneFut.onDone();
            } else {
                LT.warn(this.log, pagesToRetry.initialSize() + " checkpoint pages were not written yet due to unsuccessful page write lock acquisition and will be retried");
                while (!pagesToRetry.isEmpty()) {
                    pagesToRetry = this.writePages(pagesToRetry);
                }
                this.doneFut.onDone();
            }
        }
        catch (Throwable e) {
            this.doneFut.onDone(e);
        }
    }

    private GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePages(GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds) throws IgniteCheckedException {
        HashMap<PageMemoryEx, List<FullPageId>> pagesToRetry = new HashMap<PageMemoryEx, List<FullPageId>>();
        CheckpointMetricsTracker tracker = this.persStoreMetrics.metricsEnabled() ? this.tracker : null;
        PageStoreWriter pageStoreWriter = this.createPageStoreWriter(pagesToRetry);
        ByteBuffer tmpWriteBuf = this.threadBuf.get();
        boolean throttlingEnabled = this.throttlingPolicy != PageMemoryImpl.ThrottlingPolicy.DISABLED;
        GridConcurrentMultiPairQueue.Result res = new GridConcurrentMultiPairQueue.Result();
        while (writePageIds.next(res) && !this.shutdownNow.getAsBoolean()) {
            FullPageId cpPageId;
            this.beforePageWrite.run();
            FullPageId fullId = (FullPageId)res.getValue();
            PageMemoryEx pageMem = (PageMemoryEx)res.getKey();
            this.snapshotMgr.beforePageWrite(fullId);
            tmpWriteBuf.rewind();
            pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);
            if (!throttlingEnabled) continue;
            while (pageMem.shouldThrottle() && !(cpPageId = pageMem.pullPageFromCpBuffer()).equals(FullPageId.NULL_PAGE)) {
                this.snapshotMgr.beforePageWrite(cpPageId);
                tmpWriteBuf.rewind();
                pageMem.checkpointWritePage(cpPageId, tmpWriteBuf, pageStoreWriter, tracker);
            }
        }
        return pagesToRetry.isEmpty() ? GridConcurrentMultiPairQueue.EMPTY : new GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId>(pagesToRetry);
    }

    private PageStoreWriter createPageStoreWriter(final Map<PageMemoryEx, List<FullPageId>> pagesToRetry) {
        return new PageStoreWriter(){

            @Override
            public void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException {
                int pageType;
                if (tag == -1) {
                    PageMemoryEx pageMem = (PageMemoryEx)WriteCheckpointPages.this.pageMemoryGroupResolver.apply(fullPageId.groupId());
                    pagesToRetry.computeIfAbsent(pageMem, k -> new ArrayList()).add(fullPageId);
                    return;
                }
                int groupId = fullPageId.groupId();
                long pageId = fullPageId.pageId();
                assert (PageIO.getType(buf) != 0) : "Invalid state. Type is 0! pageId = " + IgniteUtils.hexLong(pageId);
                assert (PageIO.getVersion(buf) != 0) : "Invalid state. Version is 0! pageId = " + IgniteUtils.hexLong(pageId);
                if (WriteCheckpointPages.this.persStoreMetrics.metricsEnabled() && PageIO.isDataPageType(pageType = PageIO.getType(buf))) {
                    WriteCheckpointPages.this.tracker.onDataPageWritten();
                }
                WriteCheckpointPages.this.curCpProgress.updateWrittenPages(1);
                PageStore store = WriteCheckpointPages.this.storeMgr.writeInternal(groupId, pageId, buf, tag, true);
                WriteCheckpointPages.this.updStores.computeIfAbsent(store, k -> new LongAdder()).increment();
            }
        };
    }
}

