/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tools;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.log.MergedLog;
import kafka.server.KafkaConfig;
import kafka.server.ReplicaManager;
import kafka.tier.TopicIdPartition;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionState;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.objects.FragmentLocation;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.ObjectType;
import kafka.tier.store.objects.metadata.FileTierPartitionStateRecoveryUploadMetadata;
import kafka.tier.store.objects.metadata.ObjectStoreMetadata;
import kafka.tier.tools.TierRecoveryDataUploadCoordinator;
import kafka.tier.tools.TierRecoveryDataUploadJobStatus;
import kafka.tier.tools.TierRecoveryDataUploadResult;
import kafka.tier.tools.TierRecoveryUploadMetadataJson;
import kafka.utils.TestUtils;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Option;
import scala.collection.JavaConverters;

public class TierRecoveryDataUploadCoordinatorTest {
    private final int numThreads = 3;
    private final int broker = 5;
    private final String identifier = "rcca-1234";
    private final Properties props = TestUtils.createBrokerConfig(5, TestUtils.MockZkConnect(), true, true, TestUtils.MockZkPort(), (Option<SecurityProtocol>)Option.empty(), (Option<File>)Option.empty(), (Option<Properties>)Option.empty(), true, false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), (Option<String>)Option.empty(), 1, false, 1, (short)1, false);
    private KafkaConfig config;
    private final ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
    private final LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
    private final TierObjectStore objectStore = (TierObjectStore)Mockito.mock(TierObjectStore.class);
    private final List<TopicIdPartition> topicIdPartitions = new ArrayList<TopicIdPartition>(Arrays.asList(new TopicIdPartition("topic0", UUID.randomUUID(), 0), new TopicIdPartition("topic0", UUID.randomUUID(), 1), new TopicIdPartition("topic0", UUID.randomUUID(), 2), new TopicIdPartition("topic1", UUID.randomUUID(), 0), new TopicIdPartition("topic1", UUID.randomUUID(), 1), new TopicIdPartition("topic1", UUID.randomUUID(), 2), new TopicIdPartition("topic2", UUID.randomUUID(), 0), new TopicIdPartition("topic2", UUID.randomUUID(), 1), new TopicIdPartition("topic2", UUID.randomUUID(), 2)));
    private final List<TopicIdPartition> leaderTopicIdPartitions = Arrays.asList(this.topicIdPartitions.get(0), this.topicIdPartitions.get(4), this.topicIdPartitions.get(8));
    private final Map<String, ByteBuffer> tierOffsets = new HashMap<String, ByteBuffer>();
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final List<String> uploadJobsMetadata = new ArrayList<String>();
    private final ConcurrentLinkedDeque<Map<Integer, OffsetAndEpoch>> uploadJobsTierOffsets = new ConcurrentLinkedDeque();
    private final Map<TopicIdPartition, String> topicIdPartitionToUploadPath = new ConcurrentHashMap<TopicIdPartition, String>();
    private TierObjectStoreResponse storedMetadata = null;

    @BeforeEach
    public void setup() throws IOException {
        this.setupLeaderPartitions();
        Mockito.when((Object)this.replicaManager.logManager()).thenReturn((Object)this.logManager);
        this.setupLogManager();
        this.setupTierOffsets();
        ((TierObjectStore)Mockito.doAnswer(invocation -> {
            ByteBuffer buffer = (ByteBuffer)invocation.getArgument(1);
            ObjectType objectType = (ObjectType)invocation.getArgument(2);
            if (ObjectType.TIER_RECOVERY_METADATA_UPLOAD.equals((Object)objectType)) {
                this.uploadJobsMetadata.add(new String(buffer.array()));
            } else if (ObjectType.TIER_OFFSETS_UPLOAD.equals((Object)objectType)) {
                this.uploadJobsTierOffsets.add(this.byteBufToTierOffsets(buffer));
            } else {
                Assertions.fail((String)("Received unexpected object type: " + objectType));
            }
            return null;
        }).when((Object)this.objectStore)).putBuffer((ObjectStoreMetadata)ArgumentMatchers.any(), (ByteBuffer)ArgumentMatchers.any(), (ObjectType)ArgumentMatchers.any());
        ((TierObjectStore)Mockito.doAnswer(invocation -> {
            ObjectStoreMetadata metadata = (ObjectStoreMetadata)invocation.getArgument(0);
            ObjectType objectType = (ObjectType)invocation.getArgument(2);
            Assertions.assertEquals((Object)ObjectType.FILE_TIER_PARTITION_STATE_UPLOAD, (Object)objectType);
            String uploadPath = ((FragmentLocation)metadata.toFragmentLocation("", FragmentType.FILE_TIER_PARTITION_STATE_UPLOAD).get()).objectPath();
            this.topicIdPartitionToUploadPath.put(((FileTierPartitionStateRecoveryUploadMetadata)metadata).topicIdPartition(), uploadPath);
            return uploadPath;
        }).when((Object)this.objectStore)).putObject((ObjectStoreMetadata)ArgumentMatchers.any(), (File)ArgumentMatchers.any(), (ObjectType)ArgumentMatchers.any());
        ((TierObjectStore)Mockito.doAnswer(invocation -> {
            HashMap res = new HashMap();
            if (this.uploadJobsMetadata.isEmpty()) {
                return res;
            }
            res.put("", new ArrayList());
            return res;
        }).when((Object)this.objectStore)).listObject(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean());
        ((TierObjectStore)Mockito.doAnswer(invocation -> {
            FragmentType objectType = (FragmentType)invocation.getArgument(1);
            Assertions.assertEquals((Object)FragmentType.TIER_RECOVERY_METADATA_UPLOAD, (Object)objectType);
            this.storedMetadata = new TierObjectStoreResponse(){

                public InputStream getInputStream() {
                    if (TierRecoveryDataUploadCoordinatorTest.this.uploadJobsMetadata.isEmpty()) {
                        return new ByteArrayInputStream("".getBytes());
                    }
                    return new ByteArrayInputStream(((String)TierRecoveryDataUploadCoordinatorTest.this.uploadJobsMetadata.get(TierRecoveryDataUploadCoordinatorTest.this.uploadJobsMetadata.size() - 1)).getBytes());
                }

                public void close() throws IOException {
                }
            };
            return this.storedMetadata;
        }).when((Object)this.objectStore)).getObjectStoreFragment((ObjectStoreMetadata)ArgumentMatchers.any(), (FragmentType)ArgumentMatchers.any());
    }

    private void setupLeaderPartitions() {
        List<Partition> leaderPartitions = Arrays.asList((Partition)Mockito.mock(Partition.class), (Partition)Mockito.mock(Partition.class), (Partition)Mockito.mock(Partition.class));
        for (int i = 0; i < leaderPartitions.size(); ++i) {
            Partition partition = leaderPartitions.get(i);
            TopicIdPartition topicIdPartition = this.leaderTopicIdPartitions.get(i);
            Mockito.when((Object)partition.topicPartition()).thenReturn((Object)topicIdPartition.topicPartition());
            Mockito.when((Object)partition.topicId()).thenReturn((Object)Option.apply((Object)topicIdPartition.kafkaTopicId()));
        }
        Mockito.when((Object)this.replicaManager.leaderPartitionsIterator()).thenReturn((Object)JavaConverters.asScalaIterator(leaderPartitions.iterator()));
    }

    private void setupLogManager() throws IOException {
        this.props.put("confluent.checksum.enabled.files", "all");
        this.config = KafkaConfig.fromProps((Properties)this.props);
        ArrayList logs = new ArrayList();
        this.topicIdPartitions.forEach(tp -> logs.add(Mockito.mock(MergedLog.class)));
        ArrayList tierPartitionStates = new ArrayList();
        this.topicIdPartitions.forEach(tp -> tierPartitionStates.add(Mockito.mock(FileTierPartitionState.class)));
        Mockito.when((Object)this.logManager.allLogs()).thenReturn((Object)JavaConverters.collectionAsScalaIterable(logs));
        for (int i = 0; i < logs.size(); ++i) {
            AbstractLog log = (AbstractLog)logs.get(i);
            TierPartitionState tierPartitionState = (TierPartitionState)tierPartitionStates.get(i);
            TopicIdPartition topicIdPartition = this.topicIdPartitions.get(i);
            String copyPathStr = String.format("recovery-uploads/%s_%d_00000000000000000000.tierstate.recoveryupload.adler", topicIdPartition.topic(), topicIdPartition.partition());
            Path copyPath = Paths.get(copyPathStr, new String[0]).toAbsolutePath();
            Mockito.when((Object)log.topicIdPartition()).thenReturn((Object)Option.apply((Object)topicIdPartition));
            Mockito.when((Object)log.tierPartitionState()).thenReturn((Object)tierPartitionState);
            Mockito.when((Object)tierPartitionState.backupStateForRecovery()).thenReturn((Object)copyPath);
        }
        Mockito.when((Object)this.logManager.readTierOffsets()).thenReturn(this.tierOffsets);
    }

    private void setupTierOffsets() {
        Random random = new Random();
        HashMap<Integer, OffsetAndEpoch> logDirTierOffsets = new HashMap<Integer, OffsetAndEpoch>();
        for (int i = 0; i < 50; ++i) {
            int offset = random.nextInt();
            int epoch = random.nextInt();
            logDirTierOffsets.put(i, new OffsetAndEpoch((long)offset, Optional.of(epoch)));
        }
        this.tierOffsets.put("logDir", this.tierOffsetsToByteBuf(logDirTierOffsets));
    }

    private ByteBuffer tierOffsetsToByteBuf(Map<Integer, OffsetAndEpoch> offsets) {
        StringBuilder tierOffsetsFile = new StringBuilder();
        tierOffsetsFile.append("1");
        offsets.forEach((partition, offsetAndEpoch) -> tierOffsetsFile.append(String.format("%d %d %d\n", partition, offsetAndEpoch.offset(), offsetAndEpoch.epoch().orElse(-1))));
        return ByteBuffer.wrap(tierOffsetsFile.toString().getBytes());
    }

    private Map<Integer, OffsetAndEpoch> byteBufToTierOffsets(ByteBuffer buf) {
        HashMap<Integer, OffsetAndEpoch> logDirTierOffsets = new HashMap<Integer, OffsetAndEpoch>();
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buf.array())));
        try {
            String line;
            while ((line = reader.readLine()) != null) {
                String[] splitLine = line.split(" ");
                if (splitLine.length == 1) continue;
                Assertions.assertEquals((int)3, (int)splitLine.length);
                logDirTierOffsets.put(Integer.parseInt(splitLine[0]), new OffsetAndEpoch((long)Integer.parseInt(splitLine[1]), Optional.of(Integer.parseInt(splitLine[2]))));
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        return logDirTierOffsets;
    }

    @AfterEach
    public void cleanup() {
        this.uploadJobsMetadata.clear();
        this.topicIdPartitionToUploadPath.clear();
        this.storedMetadata = null;
    }

    @Test
    public void testUploadInitiate() throws InterruptedException, IOException {
        TierRecoveryDataUploadCoordinator coordinator = new TierRecoveryDataUploadCoordinator(this.config, this.replicaManager, this.objectStore);
        UUID jobId = this.maybeInitiateAndCompleteUpload(coordinator);
        this.verifyJobResult(coordinator, jobId, this.uploadJobsMetadata.get(0));
    }

    @Test
    public void testGetJobStatus() throws InterruptedException, IOException {
        TierRecoveryDataUploadCoordinator coordinator = new TierRecoveryDataUploadCoordinator(this.config, this.replicaManager, this.objectStore);
        UUID firstJobId = this.maybeInitiateAndCompleteUpload(coordinator);
        UUID randomJobId = UUID.randomUUID();
        while (randomJobId == firstJobId) {
            randomJobId = UUID.randomUUID();
        }
        Assertions.assertEquals((Object)TierRecoveryDataUploadJobStatus.UNKNOWN, (Object)coordinator.getJobResult(randomJobId).status());
        this.verifyJobResult(coordinator, firstJobId, this.uploadJobsMetadata.get(0));
        UUID secondJobId = this.maybeInitiateAndCompleteUpload(coordinator);
        Assertions.assertEquals((Object)TierRecoveryDataUploadJobStatus.COMPLETED, (Object)coordinator.getJobResult(firstJobId).status());
        Assertions.assertEquals((Object)TierRecoveryDataUploadJobStatus.COMPLETED, (Object)coordinator.getJobResult(secondJobId).status());
    }

    @Test
    public void testGetJobResults() throws InterruptedException, IOException {
        TierRecoveryDataUploadCoordinator coordinator = new TierRecoveryDataUploadCoordinator(this.config, this.replicaManager, this.objectStore);
        ArrayList<UUID> jobIds = new ArrayList<UUID>();
        for (int i = 0; i < 10; ++i) {
            this.setupTierOffsets();
            UUID jobId = this.maybeInitiateAndCompleteUpload(coordinator);
            jobIds.add(jobId);
            this.verifyJobResult(coordinator, jobId, this.uploadJobsMetadata.get(i));
        }
        UUID finalJobId = this.maybeInitiateAndCompleteUpload(coordinator);
        this.verifyJobResult(coordinator, finalJobId, this.uploadJobsMetadata.get(10));
        Assertions.assertEquals((Object)TierRecoveryDataUploadResult.makeDummyJobResult(), (Object)coordinator.getJobResult((UUID)jobIds.get(0)));
    }

    @Test
    public void testRerunJobWithAdditionalPartitions() throws IOException, InterruptedException {
        TierRecoveryDataUploadCoordinator coordinator = new TierRecoveryDataUploadCoordinator(this.config, this.replicaManager, this.objectStore);
        UUID jobId = this.maybeInitiateAndCompleteUpload(coordinator);
        this.verifyJobResult(coordinator, jobId, this.uploadJobsMetadata.get(0));
        this.topicIdPartitions.add(new TopicIdPartition("topic3", UUID.randomUUID(), 0));
        this.setupLogManager();
        UUID newJobId = this.maybeInitiateAndCompleteUpload(coordinator);
        this.verifyJobResult(coordinator, newJobId, this.uploadJobsMetadata.get(1));
    }

    private UUID maybeInitiateAndCompleteUpload(TierRecoveryDataUploadCoordinator coordinator) throws InterruptedException, IOException {
        this.setupLeaderPartitions();
        UUID jobId = coordinator.initiateTierRecoveryDataUpload(new HashSet<TopicIdPartition>(this.topicIdPartitions), "rcca-1234", 3);
        while (coordinator.getJobResult(jobId).status() != TierRecoveryDataUploadJobStatus.COMPLETED) {
            Assertions.assertThrows(IllegalStateException.class, () -> coordinator.initiateTierRecoveryDataUpload(new HashSet<TopicIdPartition>(this.topicIdPartitions), "rcca-1234", 3));
            Thread.sleep(100L);
        }
        Assertions.assertEquals((Object)TierRecoveryDataUploadJobStatus.COMPLETED, (Object)coordinator.getJobResult(jobId).status());
        return jobId;
    }

    private void verifyJobResult(TierRecoveryDataUploadCoordinator coordinator, UUID jobId, String uploadMetadata) throws IOException, InterruptedException {
        TierRecoveryDataUploadResult result = coordinator.getJobResult(jobId);
        Assertions.assertEquals(new HashMap(), (Object)result.failedPartitions());
        Assertions.assertTrue((boolean)result.metadataUploadCompleted());
        Assertions.assertNull((Object)result.metadataUploadFailedExceptionMessage());
        TierRecoveryUploadMetadataJson metadata = (TierRecoveryUploadMetadataJson)this.objectMapper.readValue(uploadMetadata, TierRecoveryUploadMetadataJson.class);
        Map partitionUploadInfo = metadata.partitions;
        partitionUploadInfo.forEach((topicIdPartition, uploadInfo) -> {
            Assertions.assertEquals((int)this.topicIdPartitions.size(), (int)this.topicIdPartitionToUploadPath.size());
            Assertions.assertEquals((Object)this.leaderTopicIdPartitions.contains(TopicIdPartition.fromString((String)topicIdPartition)), (Object)uploadInfo.isLeader);
            Assertions.assertEquals((Object)this.topicIdPartitionToUploadPath.getOrDefault(TopicIdPartition.fromString((String)topicIdPartition), "null"), (Object)uploadInfo.objectStorePath);
        });
        Assertions.assertEquals(new HashSet<TopicIdPartition>(this.topicIdPartitions), partitionUploadInfo.keySet().stream().map(TopicIdPartition::fromString).collect(Collectors.toSet()));
        Assertions.assertEquals((Integer)TierRecoveryDataUploadCoordinator.CURRENT_METADATA_VERSION, (Integer)metadata.version);
        Assertions.assertTrue((boolean)result.tierOffsetsUploadCompleted());
        Assertions.assertNull((Object)result.tierOffsetsUploadFailedExceptionMessage());
        Assertions.assertEquals(this.byteBufToTierOffsets(this.tierOffsets.get("logDir")), this.uploadJobsTierOffsets.getLast());
    }
}

