package kafka.tier.tools;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import kafka.tier.TopicIdPartition;
import kafka.tier.state.ChecksumUtils;
import kafka.tier.state.Header;
import kafka.tier.state.PathAndHeader;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreFunctionUtils;
import kafka.tier.store.TierObjectStoreUtils;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.metadata.FileTierPartitionStateRecoveryUploadMetadata;
import kafka.tier.topic.recovery.SelectRemoteFileTierPartitionStateOutput;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/tools/SelectRemoteFurthestFileTierPartitionStatePolicy.class */
public class SelectRemoteFurthestFileTierPartitionStatePolicy implements SelectFileTierPartitionStatePolicy {
    private static final Logger log = LoggerFactory.getLogger(SelectRemoteFurthestFileTierPartitionStatePolicy.class);
    private final String remoteRecoveryDir;
    private final String tierConfigFile;
    private final String adminConfigFile;
    private final String bootstrapServers;
    private final boolean skipReplicationFactorCheck;
    private final int replicationFactorOverride;
    private final String outputDir;

    public SelectRemoteFurthestFileTierPartitionStatePolicy(String str, String str2, String str3, String str4, boolean z, int i, String str5) {
        this.remoteRecoveryDir = str;
        this.tierConfigFile = str2;
        this.adminConfigFile = str3;
        this.bootstrapServers = str4;
        this.skipReplicationFactorCheck = z;
        this.replicationFactorOverride = i;
        this.outputDir = str5;
    }

    public static PathAndHeader getFurthestFTPSForTopicPartition(TierObjectStore tierObjectStore, List<PartitionUploadInfo> list, TopicIdPartition topicIdPartition) throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList(Collections.emptyList());
        ArrayList arrayList2 = new ArrayList();
        for (PartitionUploadInfo partitionUploadInfo : list) {
            if (partitionUploadInfo.isLeader) {
                arrayList2.add(partitionUploadInfo.objectStorePath);
            }
            FileTierPartitionStateRecoveryUploadMetadata fromPath = FileTierPartitionStateRecoveryUploadMetadata.fromPath(partitionUploadInfo.objectStorePath);
            InputStream inputStream = TierObjectStoreFunctionUtils.getObjectStoreFragment(() -> {
                return false;
            }, tierObjectStore, fromPath, FragmentType.FILE_TIER_PARTITION_STATE_UPLOAD).getInputStream();
            Throwable th = null;
            try {
                try {
                    Optional<Header> readRemoteHeader = ChecksumUtils.readRemoteHeader(fromPath.uploadObject().checksumAlgorithm(), inputStream);
                    if (!readRemoteHeader.isPresent()) {
                        throw new KafkaStorageException("Unexpectedly encountered FTPS with no header. Please validate the uploaded FTPSs for topicIdPartition=" + topicIdPartition);
                    }
                    arrayList.add(new PathAndHeader(partitionUploadInfo.objectStorePath, readRemoteHeader));
                    if (inputStream != null) {
                        if (0 != 0) {
                            try {
                                inputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            inputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (inputStream != null) {
                    if (th != null) {
                        try {
                            inputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        inputStream.close();
                    }
                }
                throw th3;
            }
        }
        if (arrayList2.size() > 1) {
            log.warn("Found more than 1 leader for topicPartition {}. List of leader FTPSs object store paths {}", topicIdPartition, String.join(", ", arrayList2));
        }
        return furthestTierStateHeader(arrayList, topicIdPartition);
    }

    public static PathAndHeader furthestTierStateHeader(List<PathAndHeader> list, TopicIdPartition topicIdPartition) {
        PathAndHeader pathAndHeader = null;
        for (PathAndHeader pathAndHeader2 : list) {
            if (pathAndHeader == null) {
                pathAndHeader = pathAndHeader2;
            } else if (tierStateHeaderIsFurther(pathAndHeader, pathAndHeader2, topicIdPartition)) {
                pathAndHeader = pathAndHeader2;
            }
        }
        return pathAndHeader;
    }

    public static boolean tierStateHeaderIsFurther(PathAndHeader pathAndHeader, PathAndHeader pathAndHeader2, TopicIdPartition topicIdPartition) {
        Header header = pathAndHeader.header().get();
        Header header2 = pathAndHeader2.header().get();
        String path = pathAndHeader.path();
        String path2 = pathAndHeader2.path();
        warnWhenFtpsHasLowerLocalMaterializedOffsetAndHigherEndOffset(header, header2, path, path2, topicIdPartition);
        warnWhenFtpsHasLowerLocalMaterializedOffsetAndHigherEndOffset(header2, header, path2, path, topicIdPartition);
        log.debug("Comparing FTPSs for topicPartition={}... \nFTPS 1: (header={}, path={}). \nFTPS 2: (header={}, path={}).", new Object[]{topicIdPartition, header, path, header2, path2});
        return header.endOffset() == header2.endOffset() ? ChecksumUtils.compareOffsetAndEpoch(header.localMaterializedOffsetAndEpoch(), header2.localMaterializedOffsetAndEpoch(), topicIdPartition.topicId()) : header2.endOffset() > header.endOffset();
    }

    private static void warnWhenFtpsHasLowerLocalMaterializedOffsetAndHigherEndOffset(Header header, Header header2, String str, String str2, TopicIdPartition topicIdPartition) {
        if (header.localMaterializedOffsetAndEpoch().offset() > header2.localMaterializedOffsetAndEpoch().offset() || header.endOffset() <= header2.endOffset()) {
            return;
        }
        log.warn("Found unexpected FTPS with a lower or equal localMaterializedOffsetAndEpoch and higher endOffset for topicIdPartition= " + topicIdPartition + ".\n\tFTPS 1: (topicId={}, path={}, endOffset={}, localMaterializedOffsetAndEpoch={}, header={}). FTPS 2: (topicId={}, path={}, endOffset={}, localMaterializedOffsetAndEpoch={}, header={}).", new Object[]{header.topicId(), str, Long.valueOf(header.endOffset()), header.localMaterializedOffsetAndEpoch(), header, header2.topicId(), str2, Long.valueOf(header2.endOffset()), header2.localMaterializedOffsetAndEpoch(), header2});
    }

    @Override // kafka.tier.tools.SelectFileTierPartitionStatePolicy
    public void run(Set<TopicIdPartition> set) throws Exception {
        File file = new File(this.outputDir);
        if (file.mkdirs()) {
            log.info("Created output directory: {}", this.outputDir);
        }
        if (!file.exists()) {
            throw new IOException("Failed to create output directory: " + this.outputDir);
        }
        if (set.isEmpty()) {
            throw new IllegalArgumentException("No affected topic partitions found. Exiting... Please check the input and try again");
        }
        log.info("Running SelectRemoteFurthestFileTierPartitionStatePolicy tool on the topicPartitions: {}", set);
        try {
            TierObjectStore objectStore = TierObjectStoreUtils.objectStore(this.tierConfigFile);
            HashMap hashMap = new HashMap();
            Map<TopicIdPartition, List<PartitionUploadInfo>> recoveryMetadata = SelectFileTierPartitionStatePolicyUtils.getRecoveryMetadata(objectStore, TierMetadataRecoveryUtils.getIdentifierFromRecoveryDir(this.remoteRecoveryDir), this.adminConfigFile, this.bootstrapServers);
            int size = set.size();
            ArrayList arrayList = new ArrayList();
            for (TopicIdPartition topicIdPartition : set) {
                try {
                    List<PartitionUploadInfo> orDefault = recoveryMetadata.getOrDefault(topicIdPartition, Collections.emptyList());
                    SelectFileTierPartitionStatePolicyUtils.validateNumOfFTPSObjects(this.skipReplicationFactorCheck, this.replicationFactorOverride, orDefault);
                    hashMap.put(topicIdPartition, getFurthestFTPSForTopicPartition(objectStore, orDefault, topicIdPartition).path());
                } catch (Exception e) {
                    arrayList.add(topicIdPartition);
                    log.error("Failed to get the furthest FTPS for topicPartition {}", topicIdPartition, e);
                }
            }
            SelectFileTierPartitionStatePolicyUtils.writeSelectRemoteFTPSOutputToFile(new SelectRemoteFileTierPartitionStateOutput((short) 1, hashMap), size, arrayList, this.outputDir);
        } catch (IOException e2) {
            log.error("Failed to create object store instance with config: {}", this.tierConfigFile);
            throw new UncheckedIOException(e2);
        }
    }
}
