/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogArchiver;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;
import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang3.StringUtils;
import org.cassandraunit.shaded.com.google.common.annotations.VisibleForTesting;
import org.cassandraunit.shaded.com.google.common.base.Predicate;
import org.cassandraunit.shaded.com.google.common.collect.HashMultimap;
import org.cassandraunit.shaded.com.google.common.collect.Iterables;
import org.cassandraunit.shaded.com.google.common.collect.Multimap;
import org.cassandraunit.shaded.com.google.common.collect.Ordering;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogReplayer
implements CommitLogReadHandler {
    @VisibleForTesting
    public static long MAX_OUTSTANDING_REPLAY_BYTES = Long.getLong("cassandra.commitlog_max_outstanding_replay_bytes", 0x4000000L);
    @VisibleForTesting
    public static MutationInitiator mutationInitiator = new MutationInitiator();
    static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
    private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
    private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
    private final Set<Keyspace> keyspacesReplayed = new NonBlockingHashSet();
    private final Queue<Future<Integer>> futures = new ArrayDeque<Future<Integer>>();
    private final AtomicInteger replayedCount = new AtomicInteger();
    private final Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted;
    private final CommitLogPosition globalPosition;
    private long pendingMutationBytes = 0L;
    private final ReplayFilter replayFilter;
    private final CommitLogArchiver archiver;
    @VisibleForTesting
    protected CommitLogReader commitLogReader;

    CommitLogReplayer(CommitLog commitLog, CommitLogPosition globalPosition, Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted, ReplayFilter replayFilter) {
        this.cfPersisted = cfPersisted;
        this.globalPosition = globalPosition;
        this.replayFilter = replayFilter;
        this.archiver = commitLog.archiver;
        this.commitLogReader = new CommitLogReader();
    }

    public static CommitLogReplayer construct(CommitLog commitLog) {
        HashMap<UUID, IntervalSet<CommitLogPosition>> cfPersisted = new HashMap<UUID, IntervalSet<CommitLogPosition>>();
        ReplayFilter replayFilter = ReplayFilter.create();
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            CommitLogPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
            if (truncatedAt != null) {
                long restoreTime = commitLog.archiver.restorePointInTime;
                long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId);
                if (truncatedTime > restoreTime && replayFilter.includes(cfs.metadata)) {
                    logger.info("Restore point in time is before latest truncation of table {}.{}. Clearing truncation record.", (Object)cfs.metadata.ksName, (Object)cfs.metadata.cfName);
                    SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId);
                    truncatedAt = null;
                }
            }
            IntervalSet<CommitLogPosition> filter = CommitLogReplayer.persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
            cfPersisted.put(cfs.metadata.cfId, filter);
        }
        CommitLogPosition globalPosition = CommitLogReplayer.firstNotCovered(cfPersisted.values());
        logger.debug("Global replay position is {} from columnfamilies {}", (Object)globalPosition, (Object)FBUtilities.toString(cfPersisted));
        return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
    }

    public void replayPath(File file, boolean tolerateTruncation) throws IOException {
        this.commitLogReader.readCommitLogSegment(this, file, this.globalPosition, -1, tolerateTruncation);
    }

    public void replayFiles(File[] clogs) throws IOException {
        this.commitLogReader.readAllFiles(this, clogs, this.globalPosition);
    }

    public int blockForWrites() {
        for (Map.Entry<UUID, AtomicInteger> entry : this.commitLogReader.getInvalidMutations()) {
            logger.warn("Skipped {} mutations from unknown (probably removed) CF with id {}", (Object)entry.getValue(), (Object)entry.getKey());
        }
        FBUtilities.waitOnFutures(this.futures);
        logger.trace("Finished waiting on mutations from recovery");
        this.futures.clear();
        boolean flushingSystem = false;
        ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
        for (Keyspace keyspace : this.keyspacesReplayed) {
            if (keyspace.getName().equals("system")) {
                flushingSystem = true;
            }
            futures.addAll(keyspace.flush());
        }
        if (!flushingSystem) {
            futures.add(Keyspace.open("system").getColumnFamilyStore("batches").forceFlush());
        }
        FBUtilities.waitOnFutures(futures);
        return this.replayedCount.get();
    }

    public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt) {
        IntervalSet.Builder<CommitLogPosition> builder = new IntervalSet.Builder<CommitLogPosition>();
        for (SSTableReader reader : onDisk) {
            builder.addAll(reader.getSSTableMetadata().commitLogIntervals);
        }
        if (truncatedAt != null) {
            builder.add(CommitLogPosition.NONE, truncatedAt);
        }
        return builder.build();
    }

    public static CommitLogPosition firstNotCovered(Collection<IntervalSet<CommitLogPosition>> ranges) {
        return ranges.stream().map(intervals -> Iterables.getFirst(intervals.ends(), CommitLogPosition.NONE)).min(Ordering.natural()).get();
    }

    private boolean shouldReplay(UUID cfId, CommitLogPosition position) {
        return !this.cfPersisted.get(cfId).contains(position);
    }

    protected boolean pointInTimeExceeded(Mutation fm) {
        long restoreTarget = this.archiver.restorePointInTime;
        for (PartitionUpdate upd : fm.getPartitionUpdates()) {
            if (this.archiver.precision.toMillis(upd.maxTimestamp()) <= restoreTarget) continue;
            return true;
        }
        return false;
    }

    @Override
    public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) {
        this.pendingMutationBytes += (long)size;
        this.futures.offer(mutationInitiator.initiateMutation(m, desc.id, size, entryLocation, this));
        while (this.futures.size() > MAX_OUTSTANDING_REPLAY_COUNT || this.pendingMutationBytes > MAX_OUTSTANDING_REPLAY_BYTES || !this.futures.isEmpty() && this.futures.peek().isDone()) {
            this.pendingMutationBytes -= (long)FBUtilities.waitOnFuture(this.futures.poll()).intValue();
        }
    }

    @Override
    public boolean shouldSkipSegmentOnError(CommitLogReadHandler.CommitLogReadException exception) throws IOException {
        if (exception.permissible) {
            logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", (Throwable)exception);
        } else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY)) {
            logger.error("Ignoring commit log replay error", (Throwable)exception);
        } else if (!CommitLog.handleCommitError("Failed commit log replay", exception)) {
            logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring commit log replay problems, specify -Dcassandra.commitlog.ignorereplayerrors=true on the command line");
            throw new CommitLogReplayException(exception.getMessage(), exception);
        }
        return false;
    }

    @Override
    public void handleUnrecoverableError(CommitLogReadHandler.CommitLogReadException exception) throws IOException {
        this.shouldSkipSegmentOnError(exception);
    }

    public static class CommitLogReplayException
    extends IOException {
        public CommitLogReplayException(String message, Throwable cause) {
            super(message, cause);
        }

        public CommitLogReplayException(String message) {
            super(message);
        }
    }

    private static class CustomReplayFilter
    extends ReplayFilter {
        private Multimap<String, String> toReplay;

        public CustomReplayFilter(Multimap<String, String> toReplay) {
            this.toReplay = toReplay;
        }

        @Override
        public Iterable<PartitionUpdate> filter(Mutation mutation) {
            final Collection<String> cfNames = this.toReplay.get(mutation.getKeyspaceName());
            if (cfNames == null) {
                return Collections.emptySet();
            }
            return Iterables.filter(mutation.getPartitionUpdates(), new Predicate<PartitionUpdate>(){

                @Override
                public boolean apply(PartitionUpdate upd) {
                    return cfNames.contains(upd.metadata().cfName);
                }
            });
        }

        @Override
        public boolean includes(CFMetaData metadata) {
            return this.toReplay.containsEntry(metadata.ksName, metadata.cfName);
        }
    }

    private static class AlwaysReplayFilter
    extends ReplayFilter {
        private AlwaysReplayFilter() {
        }

        @Override
        public Iterable<PartitionUpdate> filter(Mutation mutation) {
            return mutation.getPartitionUpdates();
        }

        @Override
        public boolean includes(CFMetaData metadata) {
            return true;
        }
    }

    static abstract class ReplayFilter {
        ReplayFilter() {
        }

        public abstract Iterable<PartitionUpdate> filter(Mutation var1);

        public abstract boolean includes(CFMetaData var1);

        public static ReplayFilter create() {
            if (System.getProperty("cassandra.replayList") == null) {
                return new AlwaysReplayFilter();
            }
            HashMultimap<String, String> toReplay = HashMultimap.create();
            for (String rawPair : System.getProperty("cassandra.replayList").split(",")) {
                String[] pair = StringUtils.split((String)rawPair.trim(), (char)'.');
                if (pair.length != 2) {
                    throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'");
                }
                Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]);
                if (ks == null) {
                    throw new IllegalArgumentException("Unknown keyspace " + pair[0]);
                }
                ColumnFamilyStore cfs = ks.getColumnFamilyStore(pair[1]);
                if (cfs == null) {
                    throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1]));
                }
                toReplay.put(pair[0], pair[1]);
            }
            return new CustomReplayFilter(toReplay);
        }
    }

    @VisibleForTesting
    public static class MutationInitiator {
        protected Future<Integer> initiateMutation(final Mutation mutation, final long segmentId, int serializedSize, final int entryLocation, final CommitLogReplayer commitLogReplayer) {
            WrappedRunnable runnable = new WrappedRunnable(){

                @Override
                public void runMayThrow() {
                    if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) {
                        return;
                    }
                    if (commitLogReplayer.pointInTimeExceeded(mutation)) {
                        return;
                    }
                    Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
                    Mutation newMutation = null;
                    for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation)) {
                        if (Schema.instance.getCF(update.metadata().cfId) == null || !commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation))) continue;
                        if (newMutation == null) {
                            newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
                        }
                        newMutation.add(update);
                        commitLogReplayer.replayedCount.incrementAndGet();
                    }
                    if (newMutation != null) {
                        assert (!newMutation.isEmpty());
                        Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
                        commitLogReplayer.keyspacesReplayed.add(keyspace);
                    }
                }
            };
            return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
        }
    }
}

