/*
 * Decompiled with CFR 0.152.
 */
package kafka.restore.schedulers;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.log.MergedLog;
import kafka.restore.RestoreMetricsManager;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.ObjectStoreRequest;
import kafka.restore.messages.RestoreObjectsInStoreRequest;
import kafka.restore.messages.RestoreObjectsInStoreResponse;
import kafka.restore.messages.UploadFtpsToStoreRequest;
import kafka.restore.operators.OperatorTestUtil;
import kafka.restore.operators.SegmentStateAndPath;
import kafka.restore.schedulers.AsyncServiceSchedulerResultsReceiver;
import kafka.restore.schedulers.MockServiceSchedulerResultsReceiver;
import kafka.restore.schedulers.ObjectStorePoolImpl;
import kafka.restore.schedulers.PermanentException;
import kafka.restore.schedulers.RetryableException;
import kafka.tier.TierTestUtils;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierTopicInitLeader;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.TierPartitionStateCleanupConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.VersionInformation;
import kafka.tier.store.objects.metadata.ObjectMetadata;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class ObjectStorePoolImplTest {
    MockServiceSchedulerResultsReceiver resultsReceiver;
    ObjectStorePoolImpl objectStorePool;
    TierObjectStore tierObjectStore;
    RestoreMetricsManager restoreMetricsManager;
    Metrics metrics;
    Time time;

    @BeforeEach
    public void setUp() throws Exception {
        this.resultsReceiver = new MockServiceSchedulerResultsReceiver();
        this.tierObjectStore = (TierObjectStore)Mockito.mock(TierObjectStore.class);
        this.metrics = new Metrics();
        this.restoreMetricsManager = new RestoreMetricsManager(this.metrics, "test_cluster");
        this.time = Time.SYSTEM;
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(50), new ThreadPoolExecutor.CallerRunsPolicy());
        this.objectStorePool = new ObjectStorePoolImpl((AsyncServiceSchedulerResultsReceiver)this.resultsReceiver, threadPool, this.tierObjectStore, this.restoreMetricsManager, this.time);
        this.objectStorePool.startUp();
    }

    @AfterEach
    public void treated() throws Exception {
        this.objectStorePool.shutdown();
    }

    @Test
    public void testSubmitUploadFtpsToStoreRequestFail() throws InterruptedException {
        UploadFtpsToStoreRequest uploadFtpsRequest = new UploadFtpsToStoreRequest(0, "test", 0, "not_exist.tierstate");
        this.objectStorePool.submitObjectStoreRequest((ObjectStoreRequest)uploadFtpsRequest);
        while (this.resultsReceiver.getMessageResponses().size() == 0) {
            System.out.println("sleep 1s, wait for threadPool handling request");
            Thread.sleep(1000L);
        }
        Assertions.assertEquals((int)1, (int)this.resultsReceiver.getMessageResponses().size());
        MessageResponse response = this.resultsReceiver.getMessageResponses().get(0);
        Assertions.assertEquals((Object)MessageResult.FAILURE, (Object)response.getResult());
    }

    @Test
    public void testSubmitUploadFtpsToStoreRequestSuccess() throws InterruptedException, IOException {
        File parentDir = TestUtils.tempDir();
        File dir = TestUtils.randomPartitionLogDir(parentDir);
        TopicPartition tp = MergedLog.parseTopicPartitionName((File)dir);
        TopicIdPartition tpid = new TopicIdPartition(tp.topic(), UUID.randomUUID(), tp.partition());
        LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(5);
        MockTime time = new MockTime();
        FileTierPartitionState state = new FileTierPartitionState(dir, logDirFailureChannel, tp, true, (Scheduler)time.scheduler, false, false, (Time)time, TierPartitionStateCleanupConfig.EMPTY, false, -1);
        state.setTopicId(tpid.topicId());
        state.beginCatchup();
        state.onCatchUpComplete();
        TierTestUtils.initTierTopicOffset();
        long timestamp = 1000L;
        state.append((AbstractTierMetadata)new TierTopicInitLeader(tpid, 0, UUID.randomUUID(), 0), TierTestUtils.nextTierTopicOffsetAndEpoch());
        OperatorTestUtil.uploadInitateAndComplete(state, tpid, 0, UUID.randomUUID(), 0L, 99L, 100, timestamp);
        OperatorTestUtil.uploadInitateAndComplete(state, tpid, 0, UUID.randomUUID(), 100L, 199L, 100, timestamp + 1L);
        state.flush();
        UploadFtpsToStoreRequest uploadFtpsRequest = new UploadFtpsToStoreRequest(0, "test", 0, OperatorTestUtil.getTierStateFile(dir));
        this.objectStorePool.submitObjectStoreRequest((ObjectStoreRequest)uploadFtpsRequest);
        while (this.resultsReceiver.getMessageResponses().size() == 0) {
            System.out.println("sleep 1s, wait for threadPool handling request");
            Thread.sleep(1000L);
        }
        Assertions.assertEquals((int)1, (int)this.resultsReceiver.getMessageResponses().size());
        MessageResponse response = this.resultsReceiver.getMessageResponses().get(0);
        Assertions.assertEquals((Object)MessageResult.SUCCESS, (Object)response.getResult());
        state.close();
        dir.delete();
        parentDir.delete();
    }

    @Test
    public void testSubmitObjectStoreRequestSuccess() throws InterruptedException {
        String segmentPathPrefix = "0/segmentPath1/";
        HashMap<UUID, SegmentStateAndPath> segmentPathMap = new HashMap<UUID, SegmentStateAndPath>();
        segmentPathMap.put(UUID.randomUUID(), this.newSegmentStateAndPath(segmentPathPrefix));
        HashMap<String, List<VersionInformation>> versionMap = new HashMap<String, List<VersionInformation>>();
        versionMap.put(segmentPathPrefix + "01.log", Arrays.asList(new VersionInformation("version1")));
        versionMap.put(segmentPathPrefix + "01.index", Arrays.asList(new VersionInformation("version1")));
        Mockito.when((Object)this.tierObjectStore.listObject((String)Mockito.any(), Mockito.anyBoolean())).thenReturn(versionMap);
        RestoreObjectsInStoreRequest restoreObjectsInStoreRequest = new RestoreObjectsInStoreRequest(0, "test-topic", 0, segmentPathMap);
        this.objectStorePool.submitObjectStoreRequest((ObjectStoreRequest)restoreObjectsInStoreRequest);
        while (this.resultsReceiver.getMessageResponses().size() == 0) {
            System.out.println("sleep 1s, wait for threadPool handling request");
            Thread.sleep(1000L);
        }
        Assertions.assertEquals((int)1, (int)this.resultsReceiver.getMessageResponses().size());
        MessageResponse response = this.resultsReceiver.getMessageResponses().get(0);
        Assertions.assertEquals((Object)MessageResult.SUCCESS, (Object)response.getResult());
    }

    @Test
    public void testListObjectsInStoreRequestRetryThenSuccess() throws InterruptedException {
        String segmentPathPrefix = "0/segmentPath1/";
        HashMap<UUID, SegmentStateAndPath> segmentPathMap = new HashMap<UUID, SegmentStateAndPath>();
        segmentPathMap.put(UUID.randomUUID(), this.newSegmentStateAndPath(segmentPathPrefix));
        HashMap<String, List<VersionInformation>> versionMap = new HashMap<String, List<VersionInformation>>();
        versionMap.put(segmentPathPrefix + "01.log", Arrays.asList(new VersionInformation("version1")));
        versionMap.put(segmentPathPrefix + "01.index", Arrays.asList(new VersionInformation("version1")));
        Mockito.when((Object)this.tierObjectStore.listObject((String)Mockito.any(), Mockito.anyBoolean())).thenThrow(new Throwable[]{new RetryableException("retriable error")}).thenReturn(versionMap);
        RestoreObjectsInStoreRequest restoreObjectsInStoreRequest = new RestoreObjectsInStoreRequest(0, "test-topic", 0, segmentPathMap);
        this.objectStorePool.submitObjectStoreRequest((ObjectStoreRequest)restoreObjectsInStoreRequest);
        while (this.resultsReceiver.getMessageResponses().size() == 0) {
            System.out.println("sleep 1s, wait for threadPool handling request");
            Thread.sleep(1000L);
        }
        Assertions.assertEquals((int)1, (int)this.resultsReceiver.getMessageResponses().size());
        MessageResponse response = this.resultsReceiver.getMessageResponses().get(0);
        Assertions.assertEquals((Object)MessageResult.SUCCESS, (Object)response.getResult());
    }

    @Test
    public void testListObjectsInStoreWithPermanentException() throws InterruptedException {
        String segmentPathPrefix = "0/segmentPath1/";
        HashMap<UUID, SegmentStateAndPath> segmentPathMap = new HashMap<UUID, SegmentStateAndPath>();
        segmentPathMap.put(UUID.randomUUID(), this.newSegmentStateAndPath(segmentPathPrefix));
        Mockito.when((Object)this.tierObjectStore.listObject((String)Mockito.any(), Mockito.anyBoolean())).thenThrow(new Throwable[]{new PermanentException("permanent error")});
        RestoreObjectsInStoreRequest restoreObjectsInStoreRequest = new RestoreObjectsInStoreRequest(0, "test-topic", 0, segmentPathMap);
        this.objectStorePool.submitObjectStoreRequest((ObjectStoreRequest)restoreObjectsInStoreRequest);
        while (this.resultsReceiver.getMessageResponses().size() == 0) {
            System.out.println("sleep 1s, wait for threadPool handling request");
            Thread.sleep(1000L);
        }
        Assertions.assertEquals((int)1, (int)this.resultsReceiver.getMessageResponses().size());
        MessageResponse response = this.resultsReceiver.getMessageResponses().get(0);
        Assertions.assertEquals((Object)MessageResult.SUCCESS, (Object)response.getResult());
    }

    @Test
    public void testListObjectsInStoreWithExhaustRetries() throws InterruptedException {
        String segmentPathPrefix = "0/segmentPath1/";
        HashMap<UUID, SegmentStateAndPath> segmentPathMap = new HashMap<UUID, SegmentStateAndPath>();
        UUID segmentId = UUID.randomUUID();
        segmentPathMap.put(segmentId, this.newSegmentStateAndPath(segmentPathPrefix));
        Mockito.when((Object)this.tierObjectStore.listObject((String)Mockito.any(), Mockito.anyBoolean())).thenThrow(new Throwable[]{new RetryableException("retriable error")});
        RestoreObjectsInStoreRequest restoreObjectsInStoreRequest = new RestoreObjectsInStoreRequest(0, "test-topic", 0, segmentPathMap);
        this.objectStorePool.submitObjectStoreRequest((ObjectStoreRequest)restoreObjectsInStoreRequest);
        while (this.resultsReceiver.getMessageResponses().size() == 0) {
            System.out.println("sleep 1s, wait for threadPool handling request");
            Thread.sleep(1000L);
        }
        Assertions.assertEquals((int)1, (int)this.resultsReceiver.getMessageResponses().size());
        RestoreObjectsInStoreResponse response = (RestoreObjectsInStoreResponse)this.resultsReceiver.getMessageResponses().get(0);
        Assertions.assertEquals((Object)MessageResult.SUCCESS, (Object)response.getResult());
        Assertions.assertEquals((int)1, (int)response.getFailedSegmentIds().size());
        Assertions.assertTrue((boolean)response.getFailedSegmentIds().contains(segmentId));
    }

    @Test
    public void testRestoreObjectByCopyWithExhaustRetries() throws InterruptedException {
        String segmentPathPrefix = "0/segmentPath1/";
        HashMap<UUID, SegmentStateAndPath> segmentPathMap = new HashMap<UUID, SegmentStateAndPath>();
        UUID segmentId = UUID.randomUUID();
        segmentPathMap.put(segmentId, this.newSegmentStateAndPath(segmentPathPrefix));
        HashMap<String, List<VersionInformation>> versionMap = new HashMap<String, List<VersionInformation>>();
        versionMap.put(segmentPathPrefix + "01.log", Arrays.asList(new VersionInformation("version1")));
        versionMap.put(segmentPathPrefix + "01.index", Arrays.asList(new VersionInformation("version1")));
        Mockito.when((Object)this.tierObjectStore.listObject((String)Mockito.any(), Mockito.anyBoolean())).thenReturn(versionMap);
        ((TierObjectStore)Mockito.doThrow((Throwable[])new Throwable[]{new RetryableException("retriable error")}).when((Object)this.tierObjectStore)).restoreObjectByCopy((ObjectMetadata)Mockito.any(), Mockito.anyString(), (VersionInformation)Mockito.any());
        RestoreObjectsInStoreRequest restoreObjectsInStoreRequest = new RestoreObjectsInStoreRequest(0, "test-topic", 0, segmentPathMap);
        this.objectStorePool.submitObjectStoreRequest((ObjectStoreRequest)restoreObjectsInStoreRequest);
        while (this.resultsReceiver.getMessageResponses().size() == 0) {
            System.out.println("sleep 1s, wait for threadPool handling request");
            Thread.sleep(1000L);
        }
        Assertions.assertEquals((int)1, (int)this.resultsReceiver.getMessageResponses().size());
        RestoreObjectsInStoreResponse response = (RestoreObjectsInStoreResponse)this.resultsReceiver.getMessageResponses().get(0);
        Assertions.assertEquals((Object)MessageResult.SUCCESS, (Object)response.getResult());
        Assertions.assertEquals((int)1, (int)response.getFailedSegmentIds().size());
        Assertions.assertTrue((boolean)response.getFailedSegmentIds().contains(segmentId));
    }

    private SegmentStateAndPath newSegmentStateAndPath(String path) {
        return new SegmentStateAndPath(null, null, path);
    }
}

