package kafka.restore.snapshot;

import io.confluent.kafka.storage.checksum.Algorithm;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import kafka.log.LocalLog;
import kafka.log.MergedLog;
import kafka.restore.RestoreConfig;
import kafka.restore.RestoreMetricsManager;
import kafka.restore.db.PartitionRestoreContext;
import kafka.tier.TopicIdPartition;
import kafka.tier.backupObjectLifecycle.RetryPolicy;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.FileTierPartitionStateSnapshotObject;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionStateCleanupConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.objects.metadata.TierPartitionStateSnapshotMetadata;
import kafka.tier.store.objects.metadata.TierRecoveryUploadMetadata;
import kafka.tier.store.objects.metadata.TierTopicSnapshotMetadata;
import kafka.tier.topic.TierTopicPartitioner;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/snapshot/PointInTimeTierPartitionStateBuilder.class */
public class PointInTimeTierPartitionStateBuilder {
    protected static final String TIER_TOPIC_NAME = "_confluent-tier-state";
    protected static final String DEFAULT_KEY_PREFIX = "";
    private final Path workingDirectory;
    private final Path ftpsSnapshotsDir;
    private final Path updatedFtpsStatesDir;
    private final Path ttpsSnapshotsDir;
    private final LogDirFailureChannel logDirFailureChannel;
    private final Time time;
    private final Scheduler scheduler;
    private static final int MAX_RETRIES_OBJECT_STORE_CALLS = 10;
    private SnapshotObjectStoreUtils snapshotUtils;
    private RestoreMetricsManager metricsManager;
    private static final Logger log = LoggerFactory.getLogger(PointInTimeTierPartitionStateBuilder.class);
    protected static final String TIER_PARTITION_SNAPSHOT_PREFIX = TierObjectStore.DataTypePathPrefix.TIER_PARTITION_STATE_METADATA_SNAPSHOT.prefix() + TierRecoveryUploadMetadata.OBJECT_PATH_DELIMITER;
    protected static final String TIER_STATE_FILENAME = LocalLog.filenamePrefixFromOffset(0) + MergedLog.TierStateSuffix();
    protected static final String TIER_STATE_CHECKSUM_FILENAME = TIER_STATE_FILENAME + Algorithm.ADLER.suffix;
    private static final long RETRY_BACKOFF_MAX_MS = 30000;
    private static final long RETRY_BACKOFF_MIN_MS = 2000;
    protected static final RetryPolicy DEFAULT_RETRY_POLICY = new RetryPolicy(10, RETRY_BACKOFF_MAX_MS, RETRY_BACKOFF_MIN_MS);
    private static final TierTopicPartitioner TIER_TOPIC_PARTITIONER = new TierTopicPartitioner(RestoreConfig.kafkaConfig().confluentConfig().tierMetadataNumPartitions().shortValue());
    private static final long ONE_MONTH_IN_MS = Duration.ofDays(30).toMillis();

    /* loaded from: input_file:kafka/restore/snapshot/PointInTimeTierPartitionStateBuilder$FtpsSnapshotsMetadata.class */
    public static class FtpsSnapshotsMetadata {
        public final Map<TopicIdPartition, TierPartitionStateSnapshotMetadata> snapshotMap;
        public OptionalLong minLastMaterializedEventTs;
        public Map<Integer, Long> minLastMaterializedTierTopicPartitionOffsets = new HashMap();

        public FtpsSnapshotsMetadata(Map<TopicIdPartition, TierPartitionStateSnapshotMetadata> map) {
            this.snapshotMap = map;
            this.minLastMaterializedEventTs = OptionalLong.empty();
            for (TopicIdPartition topicIdPartition : map.keySet()) {
                TierPartitionStateSnapshotMetadata tierPartitionStateSnapshotMetadata = map.get(topicIdPartition);
                if (tierPartitionStateSnapshotMetadata != null) {
                    FileTierPartitionStateSnapshotObject snapshotObject = tierPartitionStateSnapshotMetadata.snapshotObject();
                    if (!this.minLastMaterializedEventTs.isPresent() || snapshotObject.snapshotTimestampMs() < this.minLastMaterializedEventTs.getAsLong()) {
                        this.minLastMaterializedEventTs = OptionalLong.of(snapshotObject.snapshotTimestampMs());
                    }
                    int partitionId = PointInTimeTierPartitionStateBuilder.TIER_TOPIC_PARTITIONER.partitionId(topicIdPartition);
                    long offset = snapshotObject.lastMaterializedEventOffsetAndEpoch().offset();
                    if (!this.minLastMaterializedTierTopicPartitionOffsets.containsKey(Integer.valueOf(partitionId)) || offset < this.minLastMaterializedTierTopicPartitionOffsets.get(Integer.valueOf(partitionId)).longValue()) {
                        this.minLastMaterializedTierTopicPartitionOffsets.put(Integer.valueOf(partitionId), Long.valueOf(offset));
                    }
                }
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("FtpsSnapshotsMetadata: { ").append("minLastMaterializedEventTs: ").append(this.minLastMaterializedEventTs).append(", \n {snapshotMap: [");
            for (TopicIdPartition topicIdPartition : this.snapshotMap.keySet()) {
                sb.append(topicIdPartition).append(": ").append(this.snapshotMap.get(topicIdPartition)).append(", ");
            }
            sb.append("]}, \n {minLastMaterializedTierTopicPartitionOffsets: [");
            for (Integer num : this.minLastMaterializedTierTopicPartitionOffsets.keySet()) {
                sb.append(num).append(": ").append(this.minLastMaterializedTierTopicPartitionOffsets.get(num)).append(", ");
            }
            sb.append("]}}");
            return sb.toString();
        }
    }

    public PointInTimeTierPartitionStateBuilder(TierObjectStore tierObjectStore, ThreadPoolExecutor threadPoolExecutor, RestoreMetricsManager restoreMetricsManager) {
        this(tierObjectStore, DEFAULT_RETRY_POLICY, threadPoolExecutor, "", restoreMetricsManager);
    }

    protected PointInTimeTierPartitionStateBuilder(TierObjectStore tierObjectStore, RetryPolicy retryPolicy, ThreadPoolExecutor threadPoolExecutor, String str, RestoreMetricsManager restoreMetricsManager) {
        this.workingDirectory = Paths.get("/mnt/restore-data/", new String[0]);
        this.ftpsSnapshotsDir = Paths.get(this.workingDirectory.toString(), "ftps-snapshots");
        this.updatedFtpsStatesDir = Paths.get(this.workingDirectory.toString(), "updated-ftps-states");
        this.ttpsSnapshotsDir = Paths.get(this.workingDirectory.toString(), "ttps-snapshots");
        this.logDirFailureChannel = new LogDirFailureChannel(5);
        this.time = Time.SYSTEM;
        this.scheduler = new KafkaScheduler(1, false, "", false);
        this.workingDirectory.toFile().mkdirs();
        this.ftpsSnapshotsDir.toFile().mkdirs();
        this.updatedFtpsStatesDir.toFile().mkdirs();
        this.ttpsSnapshotsDir.toFile().mkdirs();
        this.snapshotUtils = new SnapshotObjectStoreUtils(tierObjectStore, threadPoolExecutor, retryPolicy, str, this.time, restoreMetricsManager);
        this.metricsManager = restoreMetricsManager;
    }

    public SnapshotObjectStoreUtils getSnapshotUtils() {
        return this.snapshotUtils;
    }

    private void replayEvents(TierRecordsIterator tierRecordsIterator, Map<TopicIdPartition, FtpsStateForRestore> map) throws InterruptedException {
        long j = 0;
        long j2 = 0;
        while (tierRecordsIterator.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) tierRecordsIterator.next();
            AbstractTierMetadata deserializeRecord = SnapshotObjectStoreUtils.deserializeRecord(consumerRecord);
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(consumerRecord.offset(), consumerRecord.leaderEpoch());
            TopicIdPartition topicIdPartition = deserializeRecord.topicIdPartition();
            if (map.containsKey(topicIdPartition)) {
                map.get(topicIdPartition).applyEvent(deserializeRecord, offsetAndEpoch);
                j2++;
            } else {
                j++;
            }
        }
        log.info(String.format("ttps snapshot replayed event: %s, skipped events: %s", Long.valueOf(j2), Long.valueOf(j)));
    }

    private FileTierPartitionState initTierPartitionStateFromSnapshot(TopicIdPartition topicIdPartition, File file, boolean z) throws IOException {
        return new FileTierPartitionState(file.getParentFile(), this.logDirFailureChannel, topicIdPartition.topicPartition(), true, this.scheduler, z, true, this.time, new TierPartitionStateCleanupConfig(true, ONE_MONTH_IN_MS, ONE_MONTH_IN_MS), true, -1);
    }

    protected Map<TopicIdPartition, FtpsStateForRestore> initStateForRestoreMap(Map<TopicIdPartition, Future<Path>> map, Path path, Map<TopicPartition, PartitionRestoreContext> map2) throws IOException {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicIdPartition, Future<Path>> entry : map.entrySet()) {
            TopicIdPartition key = entry.getKey();
            try {
                Path path2 = entry.getValue().get();
                if (path2 == null) {
                    log.info("Skipping TopicIdPartition " + key + " for which Tier Partition Snapshot was not found");
                } else {
                    boolean contains = path2.toString().contains(Algorithm.ADLER.suffix);
                    try {
                        Path path3 = Paths.get(path.toString(), key.topicPartition().toString(), contains ? TIER_STATE_CHECKSUM_FILENAME : TIER_STATE_FILENAME);
                        path3.toFile().getParentFile().mkdirs();
                        log.debug(String.format("[%s]: the Updated Ftps State File path: %s", key.topicPartition(), path3));
                        Files.copy(path2, path3, StandardCopyOption.REPLACE_EXISTING);
                        FileTierPartitionState initTierPartitionStateFromSnapshot = initTierPartitionStateFromSnapshot(key, path3.toFile(), contains);
                        PartitionRestoreContext partitionRestoreContext = map2.get(key.topicPartition());
                        hashMap.put(key, new FtpsStateForRestore(key, path2, initTierPartitionStateFromSnapshot, partitionRestoreContext.timeStampRestoreFrom, partitionRestoreContext.revertCompactionSinceTimestamp, this.snapshotUtils, this.metricsManager));
                    } catch (Exception e) {
                        log.error(String.format("[%s]: error when creating FtpsStateForRestore", key.topicPartition()), e);
                    }
                }
            } catch (InterruptedException | ExecutionException e2) {
                log.error("Failed to download Tier Partition Snapshot for TopicIdPartition " + key, e2);
            }
        }
        return hashMap;
    }

    public Map<TopicIdPartition, FtpsStateForRestore> buildFtpsFromSnapshot(Map<TopicPartition, PartitionRestoreContext> map) throws IOException, InterruptedException {
        if (map == null || map.isEmpty()) {
            log.info("buildFtpsFromSnapshot called with empty input");
            return Collections.emptyMap();
        }
        FtpsSnapshotsMetadata locateFtpsSnapshotsByTimestamp = this.snapshotUtils.locateFtpsSnapshotsByTimestamp(map);
        log.debug("found ftps snapshots: " + locateFtpsSnapshotsByTimestamp);
        if (!locateFtpsSnapshotsByTimestamp.minLastMaterializedEventTs.isPresent()) {
            log.info("No tier partition snapshots found in tier storage");
            return Collections.emptyMap();
        }
        Map<TopicIdPartition, Future<Path>> downloadFtpsSnapshotsInParallel = this.snapshotUtils.downloadFtpsSnapshotsInParallel(locateFtpsSnapshotsByTimestamp.snapshotMap, this.ftpsSnapshotsDir);
        log.info("downloading ftps snapshots in parallel: " + downloadFtpsSnapshotsInParallel.size());
        Map<TopicIdPartition, FtpsStateForRestore> initStateForRestoreMap = initStateForRestoreMap(downloadFtpsSnapshotsInParallel, this.updatedFtpsStatesDir, map);
        List<TierTopicSnapshotMetadata> locateTierTopicSnapshotsByTimestamp = this.snapshotUtils.locateTierTopicSnapshotsByTimestamp(locateFtpsSnapshotsByTimestamp.minLastMaterializedEventTs.getAsLong(), OptionalLong.empty());
        log.debug("found ttps snapshots: " + locateTierTopicSnapshotsByTimestamp.size());
        List<Future<Path>> downloadTierTopicSnapshotsInParallel = this.snapshotUtils.downloadTierTopicSnapshotsInParallel(locateTierTopicSnapshotsByTimestamp, this.ttpsSnapshotsDir);
        if (downloadTierTopicSnapshotsInParallel.size() > 0) {
            TierRecordsIterator tierRecordsIterator = new TierRecordsIterator(downloadTierTopicSnapshotsInParallel);
            log.info("start replaying ttps snapshots events.");
            long hiResClockMs = this.time.hiResClockMs();
            replayEvents(tierRecordsIterator, initStateForRestoreMap);
            this.metricsManager.record(RestoreMetricsManager.RESTORE_TIME_TO_REPLAY_EVENTS_MS, this.time.hiResClockMs() - hiResClockMs);
            log.info("replayed ttps snapshots events.");
            if (!validateTierPartitionOffsets(locateFtpsSnapshotsByTimestamp.minLastMaterializedTierTopicPartitionOffsets, tierRecordsIterator.tierPartitionStartOffsets())) {
                log.error("missing events between ftps and ttps snapshots");
                throw new IllegalStateException("missing events between ftps and ttps snapshots");
            }
        }
        log.info("built Ftps files for " + initStateForRestoreMap.size() + " partitions");
        return initStateForRestoreMap;
    }

    private boolean validateTierPartitionOffsets(Map<Integer, Long> map, Map<Integer, Long> map2) {
        if (map == null || map2 == null) {
            return false;
        }
        boolean z = true;
        for (Integer num : map.keySet()) {
            long longValue = map.get(num).longValue();
            if (map2.containsKey(num)) {
                long longValue2 = map2.get(num).longValue();
                if (longValue < longValue2 - 1) {
                    log.error(String.format("error: missing events between ftps and ttps snapshots in tier partition %s, lastMaterializedOffsetInFtpsSnapshot: %s, firstOffsetInTtpsSnapshot: %s", num, Long.valueOf(longValue), Long.valueOf(longValue2)));
                    z = false;
                }
            }
        }
        return z;
    }

    public static Map<Integer, Long> calculateTierTopicLastMaterializedOffsets(Map<TopicIdPartition, FtpsStateForRestore> map) {
        HashMap hashMap = new HashMap();
        for (TopicIdPartition topicIdPartition : map.keySet()) {
            int partitionId = TIER_TOPIC_PARTITIONER.partitionId(topicIdPartition);
            long offset = map.get(topicIdPartition).updatedFtpsState.lastLocalMaterializedSrcOffsetAndEpoch().offset();
            if (!hashMap.containsKey(Integer.valueOf(partitionId)) || ((Long) hashMap.get(Integer.valueOf(partitionId))).longValue() > offset) {
                hashMap.put(Integer.valueOf(partitionId), Long.valueOf(offset));
            }
        }
        return hashMap;
    }
}
