/*
 * Decompiled with CFR 0.152.
 */
package kafka.restore.schedulers;

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import kafka.restore.configmap.NodeConfig;
import kafka.restore.messages.KafkaFenceRequest;
import kafka.restore.messages.KafkaFenceResponse;
import kafka.restore.messages.KafkaFetchFtpsRequest;
import kafka.restore.messages.KafkaFetchFtpsResponse;
import kafka.restore.messages.KafkaRequest;
import kafka.restore.messages.KafkaResponse;
import kafka.restore.messages.KafkaTierPartitionStatusRequest;
import kafka.restore.messages.KafkaTierPartitionStatusResponse;
import kafka.restore.messages.MessageRequest;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.RestoreFtpsRequest;
import kafka.restore.schedulers.AbstractAsyncServiceScheduler;
import kafka.restore.schedulers.AsyncServiceSchedulerResultsReceiver;
import kafka.restore.schedulers.KafkaConnectionPool;
import kafka.restore.schedulers.KafkaManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class KafkaManagerTest
implements AsyncServiceSchedulerResultsReceiver {
    private static final int RESPONSE_UUID = 1;
    private static final int REQUEST_UUID = 0;
    private static final String TOPIC = "fakeTopic";
    private static final int PARTITION = 0;
    private static final NodeConfig BROKER = new NodeConfig(1, "localhost", 9072);
    private static final List<NodeConfig> REPLICAS = Arrays.asList(BROKER);
    private static final String FTPS_PATH = "~/fakeFtpsPath";
    private static final int EXPECTED_STATUS = 3;
    private static final int RESPONSE_QUEUE_CAPACITY = 1;
    private KafkaManager kafkaManager;
    private ArrayBlockingQueue<MessageResponse> kafkaResponseQueue;
    private KafkaConnectionPool kafkaConnectionPool;

    @BeforeEach
    public void setUp() {
        this.kafkaConnectionPool = (KafkaConnectionPool)Mockito.mock(KafkaConnectionPool.class);
        this.kafkaManager = new KafkaManager((AsyncServiceSchedulerResultsReceiver)this, this.kafkaConnectionPool);
        this.kafkaResponseQueue = new ArrayBlockingQueue(1);
        Assertions.assertNotNull((Object)this.kafkaManager);
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.NOT_STARTED, (Object)this.kafkaManager.getStatus());
        boolean success = this.kafkaManager.startUp();
        Assertions.assertTrue((boolean)success);
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.kafkaManager.getStatus());
        this.kafkaManager.pause();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.PAUSED, (Object)this.kafkaManager.getStatus());
        this.kafkaManager.resume();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.kafkaManager.getStatus());
    }

    @AfterEach
    public void shutDown() {
        this.kafkaManager.shutdown();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.SHUTDOWN, (Object)this.kafkaManager.getStatus());
    }

    @Test
    public void testConstructWithIllegalRequestQueueCapacity() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> new KafkaManager((AsyncServiceSchedulerResultsReceiver)this, this.kafkaConnectionPool, 0));
        Assertions.assertThrows(IllegalArgumentException.class, () -> new KafkaManager((AsyncServiceSchedulerResultsReceiver)this, this.kafkaConnectionPool, -1));
    }

    @Test
    public void testSubmitUnsupportedOperation() {
        RestoreFtpsRequest request = new RestoreFtpsRequest(0, TOPIC, 1, "fakeFtpsFile", new Date());
        Assertions.assertThrows(UnsupportedOperationException.class, () -> this.lambda$testSubmitUnsupportedOperation$2((MessageRequest)request));
    }

    @Test
    public void testSubmitNull() {
        Assertions.assertThrows(NullPointerException.class, () -> this.kafkaManager.submitRequest(null));
    }

    @Test
    public void testSubmitFetchFtpsRequest() {
        KafkaFetchFtpsRequest kafkaFetchFtpsRequest = new KafkaFetchFtpsRequest(0, TOPIC, 0, BROKER);
        this.mockKafkaConnectionPoolToRespondWithResponse((KafkaResponse)new KafkaFetchFtpsResponse(1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS));
        this.kafkaManager.submitRequest((MessageRequest)kafkaFetchFtpsRequest);
        MessageResponse response = this.getNextMessageResponse();
        Assertions.assertEquals(KafkaFetchFtpsResponse.class, response.getClass());
        this.assertCorrectResponse(response, 1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS);
        ((KafkaConnectionPool)Mockito.verify((Object)this.kafkaConnectionPool, (VerificationMode)Mockito.times((int)1))).submitKafkaRequest((KafkaRequest)Mockito.any());
    }

    @Test
    public void testSubmitKafkaTierPartitionEventRequest() {
        KafkaFenceRequest kafkaTierPartitionEventRequest = new KafkaFenceRequest(0, TOPIC, 0, BROKER);
        this.mockKafkaConnectionPoolToRespondWithResponse((KafkaResponse)new KafkaFenceResponse(1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS));
        this.kafkaManager.submitRequest((MessageRequest)kafkaTierPartitionEventRequest);
        MessageResponse response = this.getNextMessageResponse();
        Assertions.assertEquals(KafkaFenceResponse.class, response.getClass());
        this.assertCorrectResponse(response, 1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS);
        ((KafkaConnectionPool)Mockito.verify((Object)this.kafkaConnectionPool, (VerificationMode)Mockito.times((int)1))).submitKafkaRequest((KafkaRequest)Mockito.any());
    }

    @Test
    public void testSubmitKafkaTierPartitionStatusRequest() {
        KafkaTierPartitionStatusRequest kafkaTierPartitionStatusRequest = new KafkaTierPartitionStatusRequest(0, TOPIC, 0, BROKER, REPLICAS, 3);
        this.mockKafkaConnectionPoolToRespondWithResponse((KafkaResponse)new KafkaTierPartitionStatusResponse(1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS));
        this.kafkaManager.submitRequest((MessageRequest)kafkaTierPartitionStatusRequest);
        MessageResponse response = this.getNextMessageResponse();
        Assertions.assertEquals(KafkaTierPartitionStatusResponse.class, response.getClass());
        this.assertCorrectResponse(response, 1, TOPIC, 0, 0, MessageStatusCode.OK, MessageResult.SUCCESS);
        ((KafkaConnectionPool)Mockito.verify((Object)this.kafkaConnectionPool, (VerificationMode)Mockito.times((int)1))).submitKafkaRequest((KafkaRequest)Mockito.any());
    }

    private MessageResponse getNextMessageResponse() {
        MessageResponse response;
        try {
            response = this.kafkaResponseQueue.take();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("KafkaManager results receiver was interrupted prior to receiving result.");
        }
        return response;
    }

    private void assertCorrectResponse(MessageResponse response, int responseUuid, String topic, int partition, int requestUuid, MessageStatusCode statusCode, MessageResult result) {
        Assertions.assertEquals((int)responseUuid, (int)response.getUuid());
        Assertions.assertEquals((Object)topic, (Object)response.getTopic());
        Assertions.assertEquals((int)partition, (int)response.getPartition());
        Assertions.assertEquals((int)requestUuid, (int)response.getRequestID());
        Assertions.assertEquals((Object)statusCode, (Object)response.getStatusCode());
        Assertions.assertEquals((Object)result, (Object)response.getResult());
    }

    private void mockKafkaConnectionPoolToRespondWithResponse(final KafkaResponse response) {
        ((KafkaConnectionPool)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                KafkaManagerTest.this.reportServiceSchedulerResponse((MessageResponse)response);
                return null;
            }
        }).when((Object)this.kafkaConnectionPool)).submitKafkaRequest((KafkaRequest)Mockito.any());
    }

    public void reportServiceSchedulerResponse(MessageResponse response) {
        try {
            this.kafkaResponseQueue.put(response);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("KafkaManager results receiver was interrupted while result was being reported");
        }
        catch (Exception e) {
            throw new RuntimeException("KafkaManager results receiver failed while result was being reported");
        }
    }

    private /* synthetic */ void lambda$testSubmitUnsupportedOperation$2(MessageRequest request) throws Throwable {
        this.kafkaManager.submitRequest(request);
    }
}

