/*
 * Decompiled with CFR 0.152.
 */
package kafka.restore.snapshot;

import io.confluent.kafka.storage.checksum.Algorithm;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import kafka.log.LocalLog;
import kafka.log.MergedLog;
import kafka.restore.RestoreConfig;
import kafka.restore.RestoreMetricsManager;
import kafka.restore.db.PartitionRestoreContext;
import kafka.restore.snapshot.FtpsStateForRestore;
import kafka.restore.snapshot.SnapshotObjectStoreUtils;
import kafka.restore.snapshot.TierRecordsIterator;
import kafka.tier.TopicIdPartition;
import kafka.tier.backupObjectLifecycle.RetryPolicy;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.FileTierPartitionStateSnapshotObject;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionStateCleanupConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.objects.metadata.TierPartitionStateSnapshotMetadata;
import kafka.tier.store.objects.metadata.TierTopicSnapshotMetadata;
import kafka.tier.topic.TierTopicPartitioner;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PointInTimeTierPartitionStateBuilder {
    private static final Logger log = LoggerFactory.getLogger(PointInTimeTierPartitionStateBuilder.class);
    protected static final String TIER_TOPIC_NAME = "_confluent-tier-state";
    protected static final String TIER_PARTITION_SNAPSHOT_PREFIX = TierObjectStore.DataTypePathPrefix.TIER_PARTITION_STATE_METADATA_SNAPSHOT.prefix() + "/";
    protected static final String DEFAULT_KEY_PREFIX = "";
    protected static final String TIER_STATE_FILENAME = LocalLog.filenamePrefixFromOffset(0L) + MergedLog.TierStateSuffix();
    protected static final String TIER_STATE_CHECKSUM_FILENAME = TIER_STATE_FILENAME + Algorithm.ADLER.suffix;
    private final Path workingDirectory = Paths.get("/mnt/restore-data/", new String[0]);
    private final Path ftpsSnapshotsDir = Paths.get(this.workingDirectory.toString(), "ftps-snapshots");
    private final Path updatedFtpsStatesDir = Paths.get(this.workingDirectory.toString(), "updated-ftps-states");
    private final Path ttpsSnapshotsDir = Paths.get(this.workingDirectory.toString(), "ttps-snapshots");
    private final LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(5);
    private final Time time = Time.SYSTEM;
    private final Scheduler scheduler = new KafkaScheduler(1, false, "", false);
    private static final int MAX_RETRIES_OBJECT_STORE_CALLS = 10;
    private static final long RETRY_BACKOFF_MAX_MS = 30000L;
    private static final long RETRY_BACKOFF_MIN_MS = 2000L;
    protected static final RetryPolicy DEFAULT_RETRY_POLICY = new RetryPolicy(10, 30000L, 2000L);
    private static final TierTopicPartitioner TIER_TOPIC_PARTITIONER = new TierTopicPartitioner(RestoreConfig.kafkaConfig().confluentConfig().tierMetadataNumPartitions().shortValue());
    private static final long ONE_MONTH_IN_MS = Duration.ofDays(30L).toMillis();
    private SnapshotObjectStoreUtils snapshotUtils;
    private RestoreMetricsManager metricsManager;

    public PointInTimeTierPartitionStateBuilder(TierObjectStore store, ThreadPoolExecutor threadPool, RestoreMetricsManager metricsManager) {
        this(store, DEFAULT_RETRY_POLICY, threadPool, DEFAULT_KEY_PREFIX, metricsManager);
    }

    protected PointInTimeTierPartitionStateBuilder(TierObjectStore store, RetryPolicy retryPolicy, ThreadPoolExecutor threadPool, String keyPrefix, RestoreMetricsManager metricsManager) {
        this.workingDirectory.toFile().mkdirs();
        this.ftpsSnapshotsDir.toFile().mkdirs();
        this.updatedFtpsStatesDir.toFile().mkdirs();
        this.ttpsSnapshotsDir.toFile().mkdirs();
        this.snapshotUtils = new SnapshotObjectStoreUtils(store, threadPool, retryPolicy, keyPrefix, this.time, metricsManager);
        this.metricsManager = metricsManager;
    }

    public SnapshotObjectStoreUtils getSnapshotUtils() {
        return this.snapshotUtils;
    }

    private void replayEvents(TierRecordsIterator tierRecordsIterator, Map<TopicIdPartition, FtpsStateForRestore> results) throws InterruptedException {
        long skippedEvents = 0L;
        long replayedEvents = 0L;
        while (tierRecordsIterator.hasNext()) {
            ConsumerRecord record = (ConsumerRecord)tierRecordsIterator.next();
            AbstractTierMetadata event = SnapshotObjectStoreUtils.deserializeRecord((ConsumerRecord<byte[], byte[]>)record);
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(record.offset(), record.leaderEpoch());
            TopicIdPartition tpid = event.topicIdPartition();
            if (results.containsKey(tpid)) {
                results.get(tpid).applyEvent(event, offsetAndEpoch);
                ++replayedEvents;
                continue;
            }
            ++skippedEvents;
        }
        log.info(String.format("ttps snapshot replayed event: %s, skipped events: %s", replayedEvents, skippedEvents));
    }

    private FileTierPartitionState initTierPartitionStateFromSnapshot(TopicIdPartition tpid, File snapshot, boolean checksumEnabled) throws IOException {
        boolean tieringEnabled = true;
        boolean compactFeatureFlag = true;
        boolean cleanupEnabled = true;
        TierPartitionStateCleanupConfig cleanupConfig = new TierPartitionStateCleanupConfig(cleanupEnabled, ONE_MONTH_IN_MS, ONE_MONTH_IN_MS);
        boolean tierPartitionStateSnapshotFeatureFlag = true;
        int brokerId = -1;
        return new FileTierPartitionState(snapshot.getParentFile(), this.logDirFailureChannel, tpid.topicPartition(), tieringEnabled, this.scheduler, checksumEnabled, compactFeatureFlag, this.time, cleanupConfig, tierPartitionStateSnapshotFeatureFlag, brokerId);
    }

    protected Map<TopicIdPartition, FtpsStateForRestore> initStateForRestoreMap(Map<TopicIdPartition, Future<Path>> tierPartitionSnapshotFileFutures, Path tierPartitionStatesDir, Map<TopicPartition, PartitionRestoreContext> partitionRestoreContextMap) throws IOException {
        HashMap<TopicIdPartition, FtpsStateForRestore> results = new HashMap<TopicIdPartition, FtpsStateForRestore>();
        for (Map.Entry<TopicIdPartition, Future<Path>> entry : tierPartitionSnapshotFileFutures.entrySet()) {
            Path snapshotLocalPath;
            TopicIdPartition tpid = entry.getKey();
            try {
                snapshotLocalPath = entry.getValue().get();
            }
            catch (InterruptedException | ExecutionException e) {
                log.error("Failed to download Tier Partition Snapshot for TopicIdPartition " + tpid, (Throwable)e);
                continue;
            }
            if (snapshotLocalPath == null) {
                log.info("Skipping TopicIdPartition " + tpid + " for which Tier Partition Snapshot was not found");
                continue;
            }
            boolean checksumEnabled = snapshotLocalPath.toString().contains(Algorithm.ADLER.suffix);
            String newTierStateFilename = checksumEnabled ? TIER_STATE_CHECKSUM_FILENAME : TIER_STATE_FILENAME;
            try {
                Path updatedFtpsStateFilePath = Paths.get(tierPartitionStatesDir.toString(), tpid.topicPartition().toString(), newTierStateFilename);
                updatedFtpsStateFilePath.toFile().getParentFile().mkdirs();
                log.debug(String.format("[%s]: the Updated Ftps State File path: %s", tpid.topicPartition(), updatedFtpsStateFilePath));
                Files.copy(snapshotLocalPath, updatedFtpsStateFilePath, StandardCopyOption.REPLACE_EXISTING);
                FileTierPartitionState ftps = this.initTierPartitionStateFromSnapshot(tpid, updatedFtpsStateFilePath.toFile(), checksumEnabled);
                PartitionRestoreContext pCtx = partitionRestoreContextMap.get(tpid.topicPartition());
                results.put(tpid, new FtpsStateForRestore(tpid, snapshotLocalPath, ftps, pCtx.timeStampRestoreFrom, pCtx.revertCompactionSinceTimestamp, this.snapshotUtils, this.metricsManager));
            }
            catch (Exception ex) {
                log.error(String.format("[%s]: error when creating FtpsStateForRestore", tpid.topicPartition()), (Throwable)ex);
            }
        }
        return results;
    }

    public Map<TopicIdPartition, FtpsStateForRestore> buildFtpsFromSnapshot(Map<TopicPartition, PartitionRestoreContext> partitionRestoreContextMap) throws IOException, InterruptedException {
        if (partitionRestoreContextMap == null || partitionRestoreContextMap.isEmpty()) {
            log.info("buildFtpsFromSnapshot called with empty input");
            return Collections.emptyMap();
        }
        FtpsSnapshotsMetadata ftpsSnapshotsMetadata = this.snapshotUtils.locateFtpsSnapshotsByTimestamp(partitionRestoreContextMap);
        log.debug("found ftps snapshots: " + ftpsSnapshotsMetadata);
        if (!ftpsSnapshotsMetadata.minLastMaterializedEventTs.isPresent()) {
            log.info("No tier partition snapshots found in tier storage");
            return Collections.emptyMap();
        }
        Map<TopicIdPartition, Future<Path>> ftpsSnapshotFileFutures = this.snapshotUtils.downloadFtpsSnapshotsInParallel(ftpsSnapshotsMetadata.snapshotMap, this.ftpsSnapshotsDir);
        log.info("downloading ftps snapshots in parallel: " + ftpsSnapshotFileFutures.size());
        Map<TopicIdPartition, FtpsStateForRestore> stateToRestoreMap = this.initStateForRestoreMap(ftpsSnapshotFileFutures, this.updatedFtpsStatesDir, partitionRestoreContextMap);
        List<TierTopicSnapshotMetadata> allTierTopicSnapshotMetadata = this.snapshotUtils.locateTierTopicSnapshotsByTimestamp(ftpsSnapshotsMetadata.minLastMaterializedEventTs.getAsLong(), OptionalLong.empty());
        log.debug("found ttps snapshots: " + allTierTopicSnapshotMetadata.size());
        List<Future<Path>> tierTopicSnapshots = this.snapshotUtils.downloadTierTopicSnapshotsInParallel(allTierTopicSnapshotMetadata, this.ttpsSnapshotsDir);
        if (tierTopicSnapshots.size() > 0) {
            TierRecordsIterator tierRecordsIterator = new TierRecordsIterator(tierTopicSnapshots);
            log.info("start replaying ttps snapshots events.");
            long startMs = this.time.hiResClockMs();
            this.replayEvents(tierRecordsIterator, stateToRestoreMap);
            long timeToReplaySnapshotEventsMs = this.time.hiResClockMs() - startMs;
            this.metricsManager.record("RestoreTimeToReplayEvents", timeToReplaySnapshotEventsMs);
            log.info("replayed ttps snapshots events.");
            if (!this.validateTierPartitionOffsets(ftpsSnapshotsMetadata.minLastMaterializedTierTopicPartitionOffsets, tierRecordsIterator.tierPartitionStartOffsets())) {
                String errorStr = "missing events between ftps and ttps snapshots";
                log.error(errorStr);
                throw new IllegalStateException(errorStr);
            }
        }
        log.info("built Ftps files for " + stateToRestoreMap.size() + " partitions");
        return stateToRestoreMap;
    }

    private boolean validateTierPartitionOffsets(Map<Integer, Long> ftpsSnapshotOffsets, Map<Integer, Long> ttpsSnapshotOffsets) {
        if (ftpsSnapshotOffsets == null || ttpsSnapshotOffsets == null) {
            return false;
        }
        boolean isValid = true;
        for (Integer tierTopicPartition : ftpsSnapshotOffsets.keySet()) {
            long firstOffsetInTtpsSnapshot;
            long lastMaterializedOffsetInFtpsSnapshot = ftpsSnapshotOffsets.get(tierTopicPartition);
            if (!ttpsSnapshotOffsets.containsKey(tierTopicPartition) || lastMaterializedOffsetInFtpsSnapshot >= (firstOffsetInTtpsSnapshot = ttpsSnapshotOffsets.get(tierTopicPartition).longValue()) - 1L) continue;
            log.error(String.format("error: missing events between ftps and ttps snapshots in tier partition %s, lastMaterializedOffsetInFtpsSnapshot: %s, firstOffsetInTtpsSnapshot: %s", tierTopicPartition, lastMaterializedOffsetInFtpsSnapshot, firstOffsetInTtpsSnapshot));
            isValid = false;
        }
        return isValid;
    }

    public static Map<Integer, Long> calculateTierTopicLastMaterializedOffsets(Map<TopicIdPartition, FtpsStateForRestore> stateToRestoreMap) {
        HashMap<Integer, Long> minTierTopicLastMaterializedOffsets = new HashMap<Integer, Long>();
        for (TopicIdPartition topicIdPartition : stateToRestoreMap.keySet()) {
            int partition = TIER_TOPIC_PARTITIONER.partitionId(topicIdPartition);
            long offset = stateToRestoreMap.get((Object)topicIdPartition).updatedFtpsState.lastLocalMaterializedSrcOffsetAndEpoch().offset();
            if (minTierTopicLastMaterializedOffsets.containsKey(partition) && (Long)minTierTopicLastMaterializedOffsets.get(partition) <= offset) continue;
            minTierTopicLastMaterializedOffsets.put(partition, offset);
        }
        return minTierTopicLastMaterializedOffsets;
    }

    public static class FtpsSnapshotsMetadata {
        public final Map<TopicIdPartition, TierPartitionStateSnapshotMetadata> snapshotMap;
        public OptionalLong minLastMaterializedEventTs;
        public Map<Integer, Long> minLastMaterializedTierTopicPartitionOffsets;

        public FtpsSnapshotsMetadata(Map<TopicIdPartition, TierPartitionStateSnapshotMetadata> snapshotMap) {
            this.snapshotMap = snapshotMap;
            this.minLastMaterializedEventTs = OptionalLong.empty();
            this.minLastMaterializedTierTopicPartitionOffsets = new HashMap<Integer, Long>();
            for (TopicIdPartition topicIdPartition : snapshotMap.keySet()) {
                TierPartitionStateSnapshotMetadata metadata = snapshotMap.get(topicIdPartition);
                if (metadata == null) continue;
                FileTierPartitionStateSnapshotObject snapshotObject = metadata.snapshotObject();
                if (!this.minLastMaterializedEventTs.isPresent() || snapshotObject.snapshotTimestampMs() < this.minLastMaterializedEventTs.getAsLong()) {
                    this.minLastMaterializedEventTs = OptionalLong.of(snapshotObject.snapshotTimestampMs());
                }
                int tierTopicPartition = TIER_TOPIC_PARTITIONER.partitionId(topicIdPartition);
                long lastOffset = snapshotObject.lastMaterializedEventOffsetAndEpoch().offset();
                if (this.minLastMaterializedTierTopicPartitionOffsets.containsKey(tierTopicPartition) && lastOffset >= this.minLastMaterializedTierTopicPartitionOffsets.get(tierTopicPartition)) continue;
                this.minLastMaterializedTierTopicPartitionOffsets.put(tierTopicPartition, lastOffset);
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("FtpsSnapshotsMetadata: { ").append("minLastMaterializedEventTs: ").append(this.minLastMaterializedEventTs).append(", \n {snapshotMap: [");
            for (TopicIdPartition topicIdPartition : this.snapshotMap.keySet()) {
                sb.append(topicIdPartition).append(": ").append(this.snapshotMap.get(topicIdPartition)).append(", ");
            }
            sb.append("]}, \n {minLastMaterializedTierTopicPartitionOffsets: [");
            for (Integer tierTopicPartition : this.minLastMaterializedTierTopicPartitionOffsets.keySet()) {
                sb.append(tierTopicPartition).append(": ").append(this.minLastMaterializedTierTopicPartitionOffsets.get(tierTopicPartition)).append(", ");
            }
            sb.append("]}}");
            return sb.toString();
        }
    }
}

