/*
 * Decompiled with CFR 0.152.
 */
package kafka.restore.schedulers;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import kafka.restore.RestoreMetricsManager;
import kafka.restore.configmap.NodeConfig;
import kafka.restore.messages.KafkaRequest;
import kafka.restore.messages.KafkaTierPartitionStatusRequest;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.schedulers.AsyncServiceSchedulerResultsReceiver;
import kafka.restore.schedulers.KafkaConnectionPoolImpl;
import kafka.tier.state.TierPartitionStatus;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class KafkaConnectionPoolTest {
    private MockResultReceiver resultsReceiver;
    private CloseableHttpClient httpClient;
    KafkaConnectionPoolImpl kafkaConnectionPool;
    RestoreMetricsManager restoreMetricsManager;
    Metrics metrics;
    Time time;
    NodeConfig broker0 = new NodeConfig(0, "localhost", 9080);
    NodeConfig broker1 = new NodeConfig(1, "localhost", 9080);
    List<NodeConfig> replicas = Arrays.asList(this.broker0, this.broker1);

    private CloseableHttpResponse mockTierPartitionStatusResponse(int status) throws IOException {
        String statusContent = "{\"data\": {\"attributes\": {\"state\":" + status + "}}}";
        ByteArrayInputStream inputStream = new ByteArrayInputStream(statusContent.getBytes());
        StatusLine statusLine = (StatusLine)Mockito.mock(StatusLine.class);
        Mockito.when((Object)statusLine.getStatusCode()).thenReturn((Object)200);
        HttpEntity entity = (HttpEntity)Mockito.mock(HttpEntity.class);
        Mockito.when((Object)entity.getContent()).thenReturn((Object)inputStream);
        Mockito.when((Object)entity.getContentLength()).thenReturn((Object)statusContent.getBytes().length);
        CloseableHttpResponse response = (CloseableHttpResponse)Mockito.mock(CloseableHttpResponse.class);
        Mockito.when((Object)response.getEntity()).thenReturn((Object)entity);
        Mockito.when((Object)response.getStatusLine()).thenReturn((Object)statusLine);
        return response;
    }

    @BeforeEach
    public void setup() throws Exception {
        this.resultsReceiver = new MockResultReceiver();
        this.httpClient = (CloseableHttpClient)Mockito.mock(CloseableHttpClient.class);
        this.metrics = new Metrics();
        this.restoreMetricsManager = new RestoreMetricsManager(this.metrics, "test_cluster");
        this.time = Time.SYSTEM;
        this.kafkaConnectionPool = new KafkaConnectionPoolImpl((AsyncServiceSchedulerResultsReceiver)this.resultsReceiver, 3, "", this.restoreMetricsManager, this.time);
        this.kafkaConnectionPool.setStatusQueryRetryWaitInMs(1L);
        this.kafkaConnectionPool.startUp();
    }

    @AfterEach
    public void teardown() throws Exception {
        this.kafkaConnectionPool.shutdown();
    }

    @Test
    public void testSubmitTierPartitionStatusRequestSuccess() throws IOException, InterruptedException {
        CloseableHttpResponse resp1 = this.mockTierPartitionStatusResponse(TierPartitionStatus.READ_ONLY.ordinal());
        CloseableHttpResponse resp2 = this.mockTierPartitionStatusResponse(TierPartitionStatus.READ_ONLY.ordinal());
        Mockito.when((Object)this.httpClient.execute((HttpUriRequest)Mockito.any(HttpUriRequest.class))).thenReturn((Object)resp1).thenReturn((Object)resp2);
        this.kafkaConnectionPool.setHttpClient(this.httpClient);
        KafkaTierPartitionStatusRequest request = new KafkaTierPartitionStatusRequest(0, "test", 0, this.broker0, this.replicas, TierPartitionStatus.READ_ONLY.ordinal());
        this.kafkaConnectionPool.submitKafkaRequest((KafkaRequest)request);
        Thread.sleep(1000L);
        MessageResponse messageResponse = this.resultsReceiver.getMessageResponse();
        Assertions.assertEquals((Object)MessageResult.SUCCESS, (Object)messageResponse.getResult());
    }

    @Test
    public void testSubmitTierPartitionStatusRequestOneReplicaFail() throws IOException, InterruptedException {
        CloseableHttpResponse resp1 = this.mockTierPartitionStatusResponse(TierPartitionStatus.READ_ONLY.ordinal());
        CloseableHttpResponse resp2 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        CloseableHttpResponse resp3 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        CloseableHttpResponse resp4 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        Mockito.when((Object)this.httpClient.execute((HttpUriRequest)Mockito.any(HttpUriRequest.class))).thenReturn((Object)resp1).thenReturn((Object)resp2).thenReturn((Object)resp3).thenReturn((Object)resp4);
        this.kafkaConnectionPool.setHttpClient(this.httpClient);
        KafkaTierPartitionStatusRequest request = new KafkaTierPartitionStatusRequest(0, "test", 0, this.broker0, this.replicas, TierPartitionStatus.READ_ONLY.ordinal());
        this.kafkaConnectionPool.submitKafkaRequest((KafkaRequest)request);
        Thread.sleep(1000L);
        MessageResponse messageResponse = this.resultsReceiver.getMessageResponse();
        Assertions.assertEquals((Object)MessageResult.FAILURE, (Object)messageResponse.getResult());
    }

    @Test
    public void testSubmitTierPartitionStatusRequestRetrySuccess() throws IOException, InterruptedException {
        CloseableHttpResponse resp1 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        CloseableHttpResponse resp2 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        CloseableHttpResponse resp3 = this.mockTierPartitionStatusResponse(TierPartitionStatus.READ_ONLY.ordinal());
        Mockito.when((Object)this.httpClient.execute((HttpUriRequest)Mockito.any(HttpUriRequest.class))).thenReturn((Object)resp1).thenReturn((Object)resp2).thenReturn((Object)resp3);
        this.kafkaConnectionPool.setHttpClient(this.httpClient);
        KafkaTierPartitionStatusRequest request = new KafkaTierPartitionStatusRequest(0, "test", 0, this.broker0, Arrays.asList(this.broker0), TierPartitionStatus.READ_ONLY.ordinal());
        this.kafkaConnectionPool.submitKafkaRequest((KafkaRequest)request);
        Thread.sleep(1000L);
        MessageResponse messageResponse = this.resultsReceiver.getMessageResponse();
        Assertions.assertEquals((Object)MessageResult.SUCCESS, (Object)messageResponse.getResult());
    }

    @Test
    public void testSubmitTierPartitionStatusRequestRetryFail() throws IOException, InterruptedException {
        CloseableHttpResponse resp1 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        CloseableHttpResponse resp2 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        CloseableHttpResponse resp3 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        Mockito.when((Object)this.httpClient.execute((HttpUriRequest)Mockito.any(HttpUriRequest.class))).thenReturn((Object)resp1).thenReturn((Object)resp2).thenReturn((Object)resp3);
        this.kafkaConnectionPool.setHttpClient(this.httpClient);
        KafkaTierPartitionStatusRequest request = new KafkaTierPartitionStatusRequest(0, "test", 0, this.broker0, Arrays.asList(this.broker0), TierPartitionStatus.READ_ONLY.ordinal());
        this.kafkaConnectionPool.submitKafkaRequest((KafkaRequest)request);
        Thread.sleep(1000L);
        MessageResponse messageResponse = this.resultsReceiver.getMessageResponse();
        Assertions.assertEquals((Object)MessageResult.FAILURE, (Object)messageResponse.getResult());
    }

    @Test
    public void testGetFtpsFileName() throws Exception {
        CloseableHttpResponse res1 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        Header headerWithChecksum = this.newHeader("Content-Disposition", "attachment; filename=\"00000000000000000000.tierstate.adler\"");
        Mockito.when((Object)res1.getLastHeader("Content-Disposition")).thenReturn((Object)headerWithChecksum);
        Assertions.assertEquals((Object)"/ftps-test-0.tierstate.adler", (Object)this.kafkaConnectionPool.getFtpsFileName("test", 0, res1));
        CloseableHttpResponse res2 = this.mockTierPartitionStatusResponse(TierPartitionStatus.ONLINE.ordinal());
        Header headerWithoutChecksum = this.newHeader("Content-Disposition", "attachment; filename=\"00000000000000000000.tierstate\"");
        Mockito.when((Object)res2.getLastHeader("Content-Disposition")).thenReturn((Object)headerWithoutChecksum);
        Assertions.assertEquals((Object)"/ftps-test-0.tierstate", (Object)this.kafkaConnectionPool.getFtpsFileName("test", 0, res2));
    }

    private Header newHeader(final String name, final String value) {
        return new Header(){

            public String getName() {
                return name;
            }

            public String getValue() {
                return value;
            }

            public HeaderElement[] getElements() throws ParseException {
                return null;
            }
        };
    }

    private static class MockResultReceiver
    implements AsyncServiceSchedulerResultsReceiver {
        MessageResponse messageResponse;

        private MockResultReceiver() {
        }

        public void reportServiceSchedulerResponse(MessageResponse response) {
            this.messageResponse = response;
        }

        public MessageResponse getMessageResponse() {
            return this.messageResponse;
        }
    }
}

