/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tools;

import com.fasterxml.jackson.databind.JsonNode;
import io.confluent.rest.GetTierRecoveryDataUploadJobResultResponse;
import io.confluent.rest.InitiateTierRecoveryDataUploadResponse;
import io.confluent.rest.ResponseContainer;
import io.confluent.rest.RewindTierTopicConsumerResponse;
import io.confluent.rest.TierRecoveryDataUploadResult;
import io.confluent.rest.TierTopicHeadDataLossDetectionResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import kafka.tier.tools.TierMetadataRecoveryOrchestrator;
import kafka.tier.topic.TierTopicConsumerRewindPolicy;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TierMetadataRecoveryOrchestratorTest {
    private CloseableHttpClient httpClient;
    private MockAdminClient mockAdminClient;
    private TierMetadataRecoveryOrchestrator tierMetadataRecoveryOrchestrator;
    private CloseableHttpResponse closeableHttpResponse;
    private HttpEntity entity;
    private StatusLine statusLine;
    private ByteArrayOutputStream outContent;

    @BeforeEach
    public void setup() throws Exception {
        this.initializeMockAdminClient();
        this.httpClient = (CloseableHttpClient)Mockito.mock(CloseableHttpClient.class);
        this.closeableHttpResponse = (CloseableHttpResponse)Mockito.mock(CloseableHttpResponse.class);
        this.statusLine = (StatusLine)Mockito.mock(StatusLine.class);
        this.entity = (HttpEntity)Mockito.mock(HttpEntity.class);
        this.tierMetadataRecoveryOrchestrator = new TierMetadataRecoveryOrchestrator((AdminClient)this.mockAdminClient, this.httpClient, Integer.valueOf(9080));
        Mockito.when((Object)this.statusLine.getStatusCode()).thenReturn((Object)200);
        Mockito.when((Object)this.closeableHttpResponse.getEntity()).thenReturn((Object)this.entity);
        Mockito.when((Object)this.closeableHttpResponse.getStatusLine()).thenReturn((Object)this.statusLine);
        Mockito.when((Object)this.httpClient.execute((HttpUriRequest)Mockito.any(HttpUriRequest.class))).thenReturn((Object)this.closeableHttpResponse);
        this.outContent = new ByteArrayOutputStream();
        System.setOut(new PrintStream(this.outContent));
    }

    private void initializeMockAdminClient() {
        List<Node> brokers = Arrays.asList(new Node(0, "localhost", 9092), new Node(1, "localhost", 9093));
        this.mockAdminClient = new MockAdminClient(brokers, brokers.get(0));
        ArrayList<NewTopic> newTopics = new ArrayList<NewTopic>();
        newTopics.add(new NewTopic("topicA", 3, 2));
        newTopics.add(new NewTopic("topicB", 3, 2));
        this.mockAdminClient.createTopics(newTopics);
    }

    @ParameterizedTest
    @MethodSource(value={"rewindTierTopicConsumerArgumentProvider"})
    public void testRewindTierTopicConsumer(Optional<Map<Integer, Map<Long, Optional<Integer>>>> partitionToPositionOpt, String runScope, boolean success) throws InterruptedException, ExecutionException, IOException {
        Map<Object, Object> skippedPartitions = success ? Collections.emptyMap() : Collections.singletonMap("logDir", Collections.singleton(25));
        RewindTierTopicConsumerResponse response = new RewindTierTopicConsumerResponse(skippedPartitions);
        ResponseContainer responseContainer = ResponseContainer.dataResponse((Object)response);
        Mockito.when((Object)this.closeableHttpResponse.getEntity()).thenReturn((Object)new StringEntity(TierMetadataRecoveryOrchestrator.OBJECT_MAPPER.writeValueAsString((Object)responseContainer)));
        switch (runScope) {
            case "cluster": {
                this.tierMetadataRecoveryOrchestrator.rewindTierTopicConsumerForCluster(partitionToPositionOpt, true, TierTopicConsumerRewindPolicy.SKIP_MISSING_PARTITIONS);
                ((CloseableHttpClient)Mockito.verify((Object)this.httpClient, (VerificationMode)Mockito.times((int)2))).execute((HttpUriRequest)Mockito.any());
                if (success) {
                    this.validateOutput(this.outContent.toString(), 2, 0);
                    break;
                }
                this.validateOutput(this.outContent.toString(), 0, 2);
                break;
            }
            case "broker": {
                this.tierMetadataRecoveryOrchestrator.rewindTierTopicConsumerForBroker(partitionToPositionOpt, true, TierTopicConsumerRewindPolicy.SKIP_MISSING_PARTITIONS, 0);
                ((CloseableHttpClient)Mockito.verify((Object)this.httpClient, (VerificationMode)Mockito.times((int)1))).execute((HttpUriRequest)Mockito.any());
                if (success) {
                    this.validateOutput(this.outContent.toString(), 1, 0);
                    break;
                }
                this.validateOutput(this.outContent.toString(), 0, 1);
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid run scope");
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"initiateUploadArgumentProvider"})
    public void testInitiateUpload(String runScope, boolean success) throws IOException, ExecutionException, InterruptedException {
        UUID jobId = UUID.randomUUID();
        if (success) {
            InitiateTierRecoveryDataUploadResponse response = new InitiateTierRecoveryDataUploadResponse(jobId);
            ResponseContainer responseContainer = ResponseContainer.dataResponse((Object)response);
            Mockito.when((Object)this.closeableHttpResponse.getEntity()).thenReturn((Object)new StringEntity(TierMetadataRecoveryOrchestrator.OBJECT_MAPPER.writeValueAsString((Object)responseContainer)));
        } else {
            Mockito.when((Object)this.statusLine.getStatusCode()).thenReturn((Object)500);
        }
        HashSet<TopicIdPartition> topicIdPartitions = new HashSet<TopicIdPartition>();
        topicIdPartitions.add(new TopicIdPartition(Uuid.randomUuid(), 10, "topicA"));
        topicIdPartitions.add(new TopicIdPartition(Uuid.randomUuid(), 15, "topicB"));
        topicIdPartitions.add(new TopicIdPartition(Uuid.randomUuid(), 20, "topicC"));
        String identifier = "rcca-1234";
        int numThreads = 4;
        switch (runScope) {
            case "cluster": {
                this.tierMetadataRecoveryOrchestrator.initiateTierRecoveryDataUploadForCluster(topicIdPartitions, identifier, numThreads);
                ((CloseableHttpClient)Mockito.verify((Object)this.httpClient, (VerificationMode)Mockito.times((int)2))).execute((HttpUriRequest)Mockito.any());
                if (success) {
                    this.validateOutput(this.outContent.toString(), 2, 0);
                    break;
                }
                this.validateOutput(this.outContent.toString(), 0, 2);
                break;
            }
            case "broker": {
                this.tierMetadataRecoveryOrchestrator.initiateTierRecoveryDataUploadForBroker(topicIdPartitions, identifier, numThreads, 0);
                ((CloseableHttpClient)Mockito.verify((Object)this.httpClient, (VerificationMode)Mockito.times((int)1))).execute((HttpUriRequest)Mockito.any());
                if (success) {
                    this.validateOutput(this.outContent.toString(), 1, 0);
                    break;
                }
                this.validateOutput(this.outContent.toString(), 0, 1);
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid run scope");
            }
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testGetTierRecoveryDataUploadJobResult(boolean completed) throws IOException, ExecutionException, InterruptedException {
        TierRecoveryDataUploadResult result = completed ? new TierRecoveryDataUploadResult("rcca-1234", TierRecoveryDataUploadResult.TierRecoveryDataUploadJobStatus.COMPLETED, Collections.emptyMap(), false, new Exception("Metadata upload failed").getMessage(), true, null) : new TierRecoveryDataUploadResult("rcca-1234", TierRecoveryDataUploadResult.TierRecoveryDataUploadJobStatus.DATA_UPLOAD_COMPLETED, Collections.emptyMap(), false, null, false, null);
        GetTierRecoveryDataUploadJobResultResponse response = new GetTierRecoveryDataUploadJobResultResponse(result);
        ResponseContainer responseContainer = ResponseContainer.dataResponse((Object)response);
        Mockito.when((Object)this.closeableHttpResponse.getEntity()).thenReturn((Object)new StringEntity(TierMetadataRecoveryOrchestrator.OBJECT_MAPPER.writeValueAsString((Object)responseContainer)));
        this.tierMetadataRecoveryOrchestrator.getTierRecoveryDataUploadJobResultForBroker(0, UUID.randomUUID());
        ((CloseableHttpClient)Mockito.verify((Object)this.httpClient, (VerificationMode)Mockito.times((int)1))).execute((HttpUriRequest)Mockito.any());
        if (completed) {
            this.validateOutput(this.outContent.toString(), "completed", "failed", 1, 0);
        } else {
            this.validateOutput(this.outContent.toString(), "completed", "failed", 0, 1);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"detectDataLossArgumentProvider"})
    public void testDetectDataLossForTierTopic(Set<TopicPartition> tierTopicPartitionAllowList, String runScope, boolean success) throws IOException, ExecutionException, InterruptedException {
        TierTopicHeadDataLossDetectionResponse.CompletionStatus completionStatus = TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS;
        ArrayList<String> errorMessages = new ArrayList<String>();
        String dataLossReportPath = "uri/to/data/loss/report";
        if (!success) {
            errorMessages.add("error1");
            errorMessages.add("error2");
            completionStatus = TierTopicHeadDataLossDetectionResponse.CompletionStatus.FAILURE;
        }
        TierTopicHeadDataLossDetectionResponse response = new TierTopicHeadDataLossDetectionResponse(dataLossReportPath, completionStatus, errorMessages);
        ResponseContainer responseContainer = ResponseContainer.dataResponse((Object)response);
        Mockito.when((Object)this.closeableHttpResponse.getEntity()).thenReturn((Object)new StringEntity(TierMetadataRecoveryOrchestrator.OBJECT_MAPPER.writeValueAsString((Object)responseContainer)));
        switch (runScope) {
            case "cluster": {
                this.tierMetadataRecoveryOrchestrator.detectDataLossInTierTopicForCluster("rcca-1234", tierTopicPartitionAllowList);
                ((CloseableHttpClient)Mockito.verify((Object)this.httpClient, (VerificationMode)Mockito.times((int)2))).execute((HttpUriRequest)Mockito.any());
                if (success) {
                    this.validateOutput(this.outContent.toString(), 2, 0);
                    break;
                }
                this.validateOutput(this.outContent.toString(), 0, 2);
                break;
            }
            case "broker": {
                this.tierMetadataRecoveryOrchestrator.detectDataLossInTierTopicForBroker(0, "rcca-1234", tierTopicPartitionAllowList);
                ((CloseableHttpClient)Mockito.verify((Object)this.httpClient, (VerificationMode)Mockito.times((int)1))).execute((HttpUriRequest)Mockito.any());
                if (success) {
                    this.validateOutput(this.outContent.toString(), 1, 0);
                    break;
                }
                this.validateOutput(this.outContent.toString(), 0, 1);
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid run scope");
            }
        }
    }

    static Stream<Arguments> initiateUploadArgumentProvider() {
        return Stream.of(Arguments.of((Object[])new Object[]{"cluster", "true"}), Arguments.of((Object[])new Object[]{"cluster", "false"}), Arguments.of((Object[])new Object[]{"broker", "true"}), Arguments.of((Object[])new Object[]{"broker", "false"}));
    }

    static Stream<Arguments> rewindTierTopicConsumerArgumentProvider() {
        HashMap<Integer, Map<Long, Optional<Integer>>> partitionToPosition = new HashMap<Integer, Map<Long, Optional<Integer>>>();
        partitionToPosition.put(0, Collections.singletonMap(12345L, Optional.of(5)));
        partitionToPosition.put(2, Collections.singletonMap(5L, Optional.empty()));
        partitionToPosition.put(4, Collections.singletonMap(777L, Optional.of(17)));
        return Stream.of(Arguments.of((Object[])new Object[]{Optional.empty(), "cluster", "true"}), Arguments.of((Object[])new Object[]{Optional.empty(), "cluster", "false"}), Arguments.of((Object[])new Object[]{Optional.empty(), "broker", "true"}), Arguments.of((Object[])new Object[]{Optional.empty(), "broker", "false"}), Arguments.of((Object[])new Object[]{Optional.of(partitionToPosition), "cluster", "true"}), Arguments.of((Object[])new Object[]{Optional.of(partitionToPosition), "cluster", "false"}), Arguments.of((Object[])new Object[]{Optional.of(partitionToPosition), "broker", "true"}), Arguments.of((Object[])new Object[]{Optional.of(partitionToPosition), "broker", "false"}));
    }

    static Stream<Arguments> detectDataLossArgumentProvider() {
        HashSet<TopicPartition> tierTopicPartitionAllowList = new HashSet<TopicPartition>(){
            {
                this.add(new TopicPartition("_confluent-tier-state", 0));
                this.add(new TopicPartition("_confluent-tier-state", 1));
                this.add(new TopicPartition("_confluent-tier-state", 2));
            }
        };
        return Stream.of(Arguments.of((Object[])new Object[]{new HashSet(), "cluster", "true"}), Arguments.of((Object[])new Object[]{new HashSet(), "cluster", "false"}), Arguments.of((Object[])new Object[]{new HashSet(), "broker", "true"}), Arguments.of((Object[])new Object[]{new HashSet(), "broker", "false"}), Arguments.of((Object[])new Object[]{tierTopicPartitionAllowList, "cluster", "true"}), Arguments.of((Object[])new Object[]{tierTopicPartitionAllowList, "cluster", "false"}), Arguments.of((Object[])new Object[]{tierTopicPartitionAllowList, "broker", "true"}), Arguments.of((Object[])new Object[]{tierTopicPartitionAllowList, "broker", "false"}));
    }

    private void validateOutput(String output, String successString, String failureString, int successCount, int failedCount) throws IOException {
        JsonNode outputNode = TierMetadataRecoveryOrchestrator.OBJECT_MAPPER.readTree(output);
        Assertions.assertTrue((boolean)outputNode.get(successString).isArray());
        Assertions.assertEquals((int)successCount, (int)outputNode.get(successString).size());
        Assertions.assertTrue((boolean)outputNode.get(failureString).isArray());
        Assertions.assertEquals((int)failedCount, (int)outputNode.get(failureString).size());
    }

    private void validateOutput(String output, int successCount, int failedCount) throws IOException {
        this.validateOutput(output, "success", "failed", successCount, failedCount);
    }
}

