package kafka.tier.topic;

import io.confluent.rest.TierTopicHeadDataLossDetectionRequest;
import io.confluent.rest.TierTopicHeadDataLossDetectionResponse;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.LogManager;
import kafka.log.MergedLog;
import kafka.server.Defaults;
import kafka.server.LeaderEndpointSupplier;
import kafka.server.MockLeaderEndPoint;
import kafka.server.PartitionState;
import kafka.server.ReplicaManager;
import kafka.tier.TopicIdPartition;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionStatus;
import kafka.tier.store.MockInMemoryTierObjectStore;
import kafka.tier.store.MockInMemoryTierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.ObjectType;
import kafka.tier.store.objects.metadata.ObjectStoreMetadata;
import kafka.tier.store.objects.metadata.TierTopicHeadDataLossReportMetadata;
import kafka.tier.topic.TierTopicDataLossValidatorMetrics;
import kafka.tier.topic.recovery.AffectedTierTopicPartitionInfo;
import kafka.tier.topic.recovery.AffectedUserTopicPartitionInfo;
import kafka.tier.topic.recovery.TierTopicHeadDataLossReport;
import kafka.tier.topic.recovery.ValidationSource;
import org.apache.directory.api.util.IOUtils;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.PartitionResult;
import org.apache.kafka.clients.admin.ReplicaStatusOptions;
import org.apache.kafka.clients.admin.ReplicaStatusResult;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import scala.Option;
import scala.jdk.javaapi.CollectionConverters;

/* loaded from: input_file:kafka/tier/topic/TierTopicDataLossValidatorTest.class */
public class TierTopicDataLossValidatorTest {
    private static final int BROKER_ID = 0;
    private static final String CLUSTER_ID = "cluster-0";
    private static final String TEST_TOPIC = "test_topic";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/tier/topic/TierTopicDataLossValidatorTest$TierTopicPartitionGroup.class */
    public static class TierTopicPartitionGroup {
        final TopicPartition ttp;
        final int leader;
        final KafkaFuture<PartitionResult> kafkaFutureForReplicaStatusPartitionResult;
        final MockLeaderEndPoint mockLeaderEndpoint;

        public TierTopicPartitionGroup(int i, int i2, int i3, PartitionState partitionState, TierTopic tierTopic, Set<TopicIdPartition> set) throws Exception {
            this.ttp = new TopicPartition("_confluent-tier-state", i);
            this.leader = i2;
            Iterator<TopicIdPartition> it = set.iterator();
            while (it.hasNext()) {
                Mockito.when(tierTopic.toTierTopicPartition(it.next())).thenReturn(this.ttp);
            }
            this.kafkaFutureForReplicaStatusPartitionResult = (KafkaFuture) Mockito.mock(KafkaFuture.class);
            Mockito.when(this.kafkaFutureForReplicaStatusPartitionResult.get()).thenReturn(new PartitionResult(this.leader, i3, new ArrayList()));
            this.mockLeaderEndpoint = new MockLeaderEndPoint(new BrokerEndPoint(this.leader, "localhost", new Random().nextInt()), false, ApiKeys.FETCH.latestVersion());
            this.mockLeaderEndpoint.setLeaderState(this.ttp, partitionState);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/tier/topic/TierTopicDataLossValidatorTest$UserTopicPartitionGroup.class */
    public static class UserTopicPartitionGroup {
        final TopicIdPartition utp;
        final MergedLog mockLog = (MergedLog) Mockito.mock(MergedLog.class);
        final OffsetAndEpoch ftpsLastMaterializedOffsetEpoch;
        final TierPartitionStatus ftpsStatus;
        final Partition mockPartition;

        public UserTopicPartitionGroup(String str, int i, int i2, int i3, TierPartitionStatus tierPartitionStatus, boolean z, boolean z2) {
            this.utp = new TopicIdPartition(str, UUID.randomUUID(), i);
            if (i2 >= 0) {
                this.ftpsLastMaterializedOffsetEpoch = new OffsetAndEpoch(i3, Optional.of(Integer.valueOf(i2)));
            } else {
                this.ftpsLastMaterializedOffsetEpoch = new OffsetAndEpoch(i3, Optional.empty());
            }
            this.ftpsStatus = tierPartitionStatus;
            FileTierPartitionState fileTierPartitionState = (FileTierPartitionState) Mockito.mock(FileTierPartitionState.class);
            Mockito.when(fileTierPartitionState.status()).thenReturn(tierPartitionStatus);
            Mockito.when(fileTierPartitionState.topicIdPartition()).thenReturn(Optional.of(this.utp));
            Mockito.when(fileTierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch()).thenReturn(this.ftpsLastMaterializedOffsetEpoch);
            Mockito.when(this.mockLog.topicIdPartition()).thenReturn(Option.apply(this.utp));
            Mockito.when(Boolean.valueOf(this.mockLog.isDeleted())).thenReturn(Boolean.valueOf(z));
            Mockito.when(Boolean.valueOf(this.mockLog.isStray())).thenReturn(Boolean.valueOf(z2));
            Mockito.when(this.mockLog.tierPartitionState()).thenReturn(fileTierPartitionState);
            this.mockPartition = (Partition) Mockito.mock(Partition.class);
            Mockito.when(this.mockPartition.topicPartition()).thenReturn(this.utp.topicPartition());
        }

        int lastMaterializedEpoch() {
            return ((Integer) this.ftpsLastMaterializedOffsetEpoch.epoch().get()).intValue();
        }
    }

    private TierTopicDataLossValidator newInstance(TierObjectStore tierObjectStore, TierTopic tierTopic, LogManager logManager, ConfluentAdmin confluentAdmin, LeaderEndpointSupplier leaderEndpointSupplier, List<Partition> list, Time time, Metrics metrics, boolean z, boolean z2) {
        TierTopicManagerConfig tierTopicManagerConfig = new TierTopicManagerConfig(() -> {
            return Collections.singletonMap("bootstrap.servers", "bootstrap");
        }, "", Defaults.TierMetadataNumPartitions(), Defaults.TierMetadataReplicationFactor(), 0, CLUSTER_ID, 5L, 30000, 500, -1L, new ArrayList(), Boolean.valueOf(Defaults.TierTopicProducerEnableIdempotence()), Boolean.valueOf(z), 7200000L, Boolean.valueOf(z2), Boolean.valueOf(Defaults.TierTopicMaterializationFromSnapshotEnable()));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.leaderPartitionsIterator()).thenReturn(CollectionConverters.asScala(list).iterator());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        return new TierTopicDataLossValidator(tierTopicManagerConfig, tierTopic, tierObjectStore, replicaManager, () -> {
            return confluentAdmin;
        }, leaderEndpointSupplier, time, metrics);
    }

    private static TierTopicHeadDataLossDetectionRequest createRequest() {
        return createRequestWithAllowList(new HashSet());
    }

    private static TierTopicHeadDataLossDetectionRequest createRequestWithAllowList(Set<TopicPartition> set) {
        return new TierTopicHeadDataLossDetectionRequest(UUID.randomUUID().toString(), set);
    }

    private static MockInMemoryTierObjectStore createMockObjectStore(Time time, Metrics metrics) {
        return new MockInMemoryTierObjectStore(time, metrics, new MockInMemoryTierObjectStoreConfig(CLUSTER_ID, 0));
    }

    @Test
    public void testThrowOnDataLossDetectionDisabled() {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        TierTopicDataLossValidator newInstance = newInstance((TierObjectStore) Mockito.mock(TierObjectStore.class), (TierTopic) Mockito.mock(TierTopic.class), (LogManager) Mockito.mock(LogManager.class), (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class), (LeaderEndpointSupplier) Mockito.mock(LeaderEndpointSupplier.class), new ArrayList(), new MockTime(), new Metrics(), false, true);
        TierTopicHeadDataLossDetectionRequest createRequest = createRequest();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            newInstance.detectDataLossInTierTopicHead(createRequest, validationSource, (Producer) null, 30000L, 0L);
        });
        checkZeroMetrics(newInstance, validationSource);
    }

    @Test
    public void testThrowOnCleanShutdownForUncleanValidationSource() {
        ValidationSource validationSource = ValidationSource.UNCLEAN_RESTART_VALIDATION;
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        Mockito.when(Boolean.valueOf(logManager.hadCleanShutdown())).thenReturn(true);
        TierTopicDataLossValidator newInstance = newInstance((TierObjectStore) Mockito.mock(TierObjectStore.class), (TierTopic) Mockito.mock(TierTopic.class), logManager, (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class), (LeaderEndpointSupplier) Mockito.mock(LeaderEndpointSupplier.class), new ArrayList(), new MockTime(), new Metrics(), true, true);
        TierTopicHeadDataLossDetectionRequest createRequest = createRequest();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            newInstance.detectDataLossInTierTopicHead(createRequest, validationSource, (Producer) null, 30000L, 0L);
        });
        checkZeroMetrics(newInstance, validationSource);
    }

    @Test
    public void testDataLossDetectionCompletesWithoutEligibleUserPartitionReplicas() throws Exception {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        Mockito.when(Boolean.valueOf(logManager.hadCleanShutdown())).thenReturn(true);
        Mockito.when(logManager.allLogs()).thenReturn(CollectionConverters.asScala(new ArrayList()));
        TierTopicDataLossValidator newInstance = newInstance((TierObjectStore) Mockito.mock(TierObjectStore.class), (TierTopic) Mockito.mock(TierTopic.class), logManager, (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class), (LeaderEndpointSupplier) Mockito.mock(LeaderEndpointSupplier.class), new ArrayList(), new MockTime(), new Metrics(), true, true);
        TierTopicHeadDataLossDetectionRequest createRequest = createRequest();
        TierTopicDataLossValidatorMetrics.DataLossDetectionMetricGroup metricGroupOrThrow = newInstance.dataLossMetrics.getMetricGroupOrThrow(validationSource);
        metricGroupOrThrow.recordDataLossReport(true, true, 2, 1, 1, 1, 1);
        metricGroupOrThrow.recordDataLossReportUploadStatus(false);
        TierTopicHeadDataLossDetectionResponse detectDataLossInTierTopicHead = newInstance.detectDataLossInTierTopicHead(createRequest, validationSource, (Producer) null, 30000L, 0L);
        Assertions.assertTrue(detectDataLossInTierTopicHead.dataLossReportPath().isEmpty());
        Assertions.assertEquals(TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, detectDataLossInTierTopicHead.completionStatus());
        Assertions.assertTrue(detectDataLossInTierTopicHead.errorMessages().isEmpty());
        checkZeroMetrics(newInstance, validationSource);
    }

    private static PartitionState makePartitionState(int i, int i2, List<RecordBatch>... listArr) {
        ArrayList arrayList = new ArrayList();
        for (List<RecordBatch> list : listArr) {
            arrayList.addAll(list);
        }
        return PartitionState.apply(CollectionConverters.asScala(arrayList).toSeq(), i, i2, false);
    }

    private static List<RecordBatch> makeBatch(long j, int i, int i2) {
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i2; i3++) {
            arrayList.add(new SimpleRecord(UUID.randomUUID().toString().getBytes()));
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator it = MemoryRecords.withRecords(j, CompressionType.NONE, Integer.valueOf(i), (SimpleRecord[]) arrayList.toArray(new SimpleRecord[0])).batches().iterator();
        arrayList2.getClass();
        it.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        return arrayList2;
    }

    private static void setupMockLogs(LogManager logManager, List<MergedLog> list) {
        Mockito.when(logManager.allLogs()).thenReturn(CollectionConverters.asScala(new HashSet(list)));
    }

    private static TierTopic setupMockTierTopic() {
        TierTopic tierTopic = (TierTopic) Mockito.mock(TierTopic.class);
        Mockito.when(tierTopic.topicName()).thenReturn("_confluent-tier-state");
        Mockito.when(tierTopic.numPartitions()).thenReturn(OptionalInt.of(Defaults.TierMetadataNumPartitions()));
        return tierTopic;
    }

    private static void checkMetrics(TierTopicDataLossValidator tierTopicDataLossValidator, ValidationSource validationSource, int i, int i2, int i3, int i4, int i5, int i6) {
        TierTopicDataLossValidatorMetrics.DataLossDetectionMetricGroup metricGroupOrThrow = tierTopicDataLossValidator.dataLossMetrics.getMetricGroupOrThrow(validationSource);
        Assertions.assertEquals(i, metricGroupOrThrow.dataLossDetectedMetric.value());
        Assertions.assertEquals(i2, metricGroupOrThrow.dataLossDetectionFailureMetric.value());
        Assertions.assertEquals(i3, metricGroupOrThrow.affectedTierTopicPartitionCountMetric.value());
        Assertions.assertEquals(i4, metricGroupOrThrow.affectedUserPartitionReplicaCountMetric.value());
        Assertions.assertEquals(i5, metricGroupOrThrow.affectedUserPartitionLeaderCountMetric.value());
        Assertions.assertEquals(i6, metricGroupOrThrow.dataLossReportUploadFailureMetric.value());
    }

    private static void checkZeroMetrics(TierTopicDataLossValidator tierTopicDataLossValidator, ValidationSource validationSource) {
        checkMetrics(tierTopicDataLossValidator, validationSource, 0, 0, 0, 0, 0, 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AffectedUserTopicPartitionInfo makeAffectedUserTopicPartitionInfo(TierTopicPartitionGroup tierTopicPartitionGroup, UserTopicPartitionGroup userTopicPartitionGroup, boolean z) {
        return new AffectedUserTopicPartitionInfo(tierTopicPartitionGroup.ttp.partition(), userTopicPartitionGroup.ftpsLastMaterializedOffsetEpoch, userTopicPartitionGroup.ftpsStatus, z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AffectedTierTopicPartitionInfo makeAffectedTierTopicPartitionInfo(UserTopicPartitionGroup userTopicPartitionGroup, boolean z, OffsetAndEpoch offsetAndEpoch) {
        return new AffectedTierTopicPartitionInfo(userTopicPartitionGroup.ftpsLastMaterializedOffsetEpoch, userTopicPartitionGroup.utp, userTopicPartitionGroup.ftpsStatus, z, 0, offsetAndEpoch);
    }

    private static void setupReplicaStatusMock(ConfluentAdmin confluentAdmin, final Set<TierTopicPartitionGroup> set, int i) {
        ReplicaStatusResult replicaStatusResult = new ReplicaStatusResult(new HashMap<TopicPartition, KafkaFuture<PartitionResult>>() { // from class: kafka.tier.topic.TierTopicDataLossValidatorTest.1
            {
                for (TierTopicPartitionGroup tierTopicPartitionGroup : set) {
                    put(tierTopicPartitionGroup.ttp, tierTopicPartitionGroup.kafkaFutureForReplicaStatusPartitionResult);
                }
            }
        });
        OngoingStubbing thenReturn = Mockito.when(confluentAdmin.replicaStatus((Set) ArgumentMatchers.any(), (ReplicaStatusOptions) ArgumentMatchers.any())).thenReturn(replicaStatusResult);
        while (i > 1) {
            thenReturn = thenReturn.thenReturn(replicaStatusResult);
            i--;
        }
    }

    private static TierTopicHeadDataLossReport deserializeReport(TierTopicHeadDataLossDetectionResponse tierTopicHeadDataLossDetectionResponse, TierObjectStore tierObjectStore) throws IOException {
        String dataLossReportPath = tierTopicHeadDataLossDetectionResponse.dataLossReportPath();
        Assertions.assertFalse(dataLossReportPath.isEmpty());
        TierObjectStoreResponse objectStoreFragment = tierObjectStore.getObjectStoreFragment(TierTopicHeadDataLossReportMetadata.fromPath(dataLossReportPath), FragmentType.TIER_TOPIC_HEAD_DATA_LOSS_REPORT);
        objectStoreFragment.getInputStream();
        return TierTopicHeadDataLossReport.readJsonFromString(IOUtils.toString(objectStoreFragment.getInputStream(), Charset.defaultCharset()));
    }

    private static void verifyMocks(ConfluentAdmin confluentAdmin, Set<TierTopicPartitionGroup> set, int i) {
        ((ConfluentAdmin) Mockito.verify(confluentAdmin, Mockito.times(i))).replicaStatus((Set) ArgumentMatchers.any(), (ReplicaStatusOptions) ArgumentMatchers.any());
        set.forEach(tierTopicPartitionGroup -> {
            try {
                ((KafkaFuture) Mockito.verify(tierTopicPartitionGroup.kafkaFutureForReplicaStatusPartitionResult, Mockito.times(i))).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void testSuccessfulDataLossDetectionLogic(ValidationSource validationSource) throws Exception {
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        Mockito.when(Boolean.valueOf(logManager.hadCleanShutdown())).thenReturn(Boolean.valueOf(validationSource != ValidationSource.UNCLEAN_RESTART_VALIDATION));
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics();
        final UserTopicPartitionGroup userTopicPartitionGroup = new UserTopicPartitionGroup(TEST_TOPIC, 0, 2, 5, TierPartitionStatus.ONLINE, false, false);
        final UserTopicPartitionGroup userTopicPartitionGroup2 = new UserTopicPartitionGroup(TEST_TOPIC, 1, 1, 2, TierPartitionStatus.ONLINE, false, false);
        UserTopicPartitionGroup userTopicPartitionGroup3 = new UserTopicPartitionGroup(TEST_TOPIC, 2, 12, 15, TierPartitionStatus.ONLINE, true, false);
        final UserTopicPartitionGroup userTopicPartitionGroup4 = new UserTopicPartitionGroup(TEST_TOPIC, 3, 3, 6, TierPartitionStatus.ERROR, false, false);
        final UserTopicPartitionGroup userTopicPartitionGroup5 = new UserTopicPartitionGroup(TEST_TOPIC, 4, 3, 4, TierPartitionStatus.ONLINE, false, false);
        UserTopicPartitionGroup userTopicPartitionGroup6 = new UserTopicPartitionGroup(TEST_TOPIC, 5, 10, 25, TierPartitionStatus.ONLINE, false, true);
        UserTopicPartitionGroup userTopicPartitionGroup7 = new UserTopicPartitionGroup(TEST_TOPIC, 6, 3, 5, TierPartitionStatus.ONLINE, false, false);
        UserTopicPartitionGroup userTopicPartitionGroup8 = new UserTopicPartitionGroup(TEST_TOPIC, 7, -1, 10, TierPartitionStatus.ONLINE, false, false);
        UserTopicPartitionGroup userTopicPartitionGroup9 = new UserTopicPartitionGroup(TEST_TOPIC, 8, 3, -1, TierPartitionStatus.ONLINE, false, false);
        setupMockLogs(logManager, Arrays.asList(userTopicPartitionGroup.mockLog, userTopicPartitionGroup2.mockLog, userTopicPartitionGroup3.mockLog, userTopicPartitionGroup4.mockLog, userTopicPartitionGroup5.mockLog, userTopicPartitionGroup6.mockLog, userTopicPartitionGroup7.mockLog, userTopicPartitionGroup8.mockLog, userTopicPartitionGroup9.mockLog));
        TierTopic tierTopic = setupMockTierTopic();
        HashMap hashMap = new HashMap();
        final TierTopicPartitionGroup tierTopicPartitionGroup = new TierTopicPartitionGroup(0, 1, userTopicPartitionGroup.lastMaterializedEpoch(), makePartitionState(userTopicPartitionGroup.lastMaterializedEpoch(), 6, makeBatch(0L, 0, 2), makeBatch(2L, 1, 2)), tierTopic, new HashSet(Arrays.asList(userTopicPartitionGroup.utp, userTopicPartitionGroup2.utp)));
        hashMap.put(1, tierTopicPartitionGroup);
        final TierTopicPartitionGroup tierTopicPartitionGroup2 = new TierTopicPartitionGroup(1, 3, userTopicPartitionGroup4.lastMaterializedEpoch(), makePartitionState(userTopicPartitionGroup4.lastMaterializedEpoch(), 6, makeBatch(0L, 0, 1), makeBatch(1L, 1, 1), makeBatch(2L, 2, 2), makeBatch(4L, 4, 2)), tierTopic, new HashSet(Arrays.asList(userTopicPartitionGroup3.utp, userTopicPartitionGroup4.utp, userTopicPartitionGroup5.utp)));
        hashMap.put(3, tierTopicPartitionGroup2);
        TierTopicPartitionGroup tierTopicPartitionGroup3 = new TierTopicPartitionGroup(2, 5, userTopicPartitionGroup7.lastMaterializedEpoch(), makePartitionState(userTopicPartitionGroup7.lastMaterializedEpoch(), 8, makeBatch(0L, 0, 1), makeBatch(1L, 1, 1), makeBatch(2L, 2, 2), makeBatch(4L, 3, 2), makeBatch(6L, 4, 2)), tierTopic, new HashSet(Arrays.asList(userTopicPartitionGroup6.utp, userTopicPartitionGroup7.utp, userTopicPartitionGroup8.utp, userTopicPartitionGroup9.utp)));
        hashMap.put(5, tierTopicPartitionGroup3);
        HashSet hashSet = new HashSet(Arrays.asList(tierTopicPartitionGroup, tierTopicPartitionGroup2, tierTopicPartitionGroup3));
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class);
        setupReplicaStatusMock(confluentAdmin, hashSet, 2);
        MockInMemoryTierObjectStore createMockObjectStore = createMockObjectStore(mockTime, metrics);
        TierTopicDataLossValidator newInstance = newInstance(createMockObjectStore, tierTopic, logManager, confluentAdmin, (i, i2, str) -> {
            return ((TierTopicPartitionGroup) hashMap.get(Integer.valueOf(i2))).mockLeaderEndpoint;
        }, Collections.singletonList(userTopicPartitionGroup4.mockPartition), mockTime, metrics, true, validationSource == ValidationSource.UNCLEAN_RESTART_VALIDATION);
        Producer producer = null;
        if (validationSource == ValidationSource.UNCLEAN_RESTART_VALIDATION) {
            producer = (Producer) Mockito.mock(KafkaProducer.class);
            Future future = (Future) Mockito.mock(Future.class);
            RecordMetadata recordMetadata = new RecordMetadata(tierTopicPartitionGroup.ttp, 4L, 0, -1L, -1, -1);
            Future future2 = (Future) Mockito.mock(Future.class);
            RecordMetadata recordMetadata2 = new RecordMetadata(tierTopicPartitionGroup2.ttp, 6L, 0, -1L, -1, -1);
            Future future3 = (Future) Mockito.mock(Future.class);
            RecordMetadata recordMetadata3 = new RecordMetadata(tierTopicPartitionGroup2.ttp, 7L, 0, -1L, -1, -1);
            Mockito.when(producer.send((ProducerRecord) ArgumentMatchers.any())).thenReturn(future).thenReturn(future2).thenReturn(future3);
            Mockito.when(future.get()).thenReturn(recordMetadata);
            Mockito.when(future2.get()).thenReturn(recordMetadata2);
            Mockito.when(future3.get()).thenReturn(recordMetadata3);
        }
        TierTopicHeadDataLossDetectionResponse detectDataLossInTierTopicHead = newInstance.detectDataLossInTierTopicHead(createRequest(), validationSource, producer, 30000L, 0L);
        checkMetrics(newInstance, validationSource, 1, 0, 2, 4, 1, 0);
        Assertions.assertTrue(detectDataLossInTierTopicHead.errorMessages().isEmpty());
        Assertions.assertEquals(TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, detectDataLossInTierTopicHead.completionStatus());
        TierTopicHeadDataLossReport deserializeReport = deserializeReport(detectDataLossInTierTopicHead, createMockObjectStore);
        Assertions.assertEquals(TierTopicHeadDataLossReport.CompletionStatus.SUCCESS, deserializeReport.completionStatus());
        Assertions.assertEquals((short) 1, deserializeReport.version());
        Assertions.assertEquals(0, deserializeReport.brokerId());
        Assertions.assertEquals(validationSource, deserializeReport.source());
        Assertions.assertTrue(deserializeReport.startTimestampMs() >= mockTime.milliseconds());
        Assertions.assertTrue(deserializeReport.endTimestampMs() >= deserializeReport.startTimestampMs());
        Assertions.assertTrue(deserializeReport.hasDataLoss());
        Assertions.assertEquals(new HashMap<TopicIdPartition, AffectedUserTopicPartitionInfo>() { // from class: kafka.tier.topic.TierTopicDataLossValidatorTest.2
            {
                put(userTopicPartitionGroup.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(tierTopicPartitionGroup, userTopicPartitionGroup, false));
                put(userTopicPartitionGroup2.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(tierTopicPartitionGroup, userTopicPartitionGroup2, false));
                put(userTopicPartitionGroup4.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(tierTopicPartitionGroup2, userTopicPartitionGroup4, true));
                put(userTopicPartitionGroup5.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(tierTopicPartitionGroup2, userTopicPartitionGroup5, false));
            }
        }, deserializeReport.affectedUserTopicPartitions());
        Assertions.assertEquals(new HashMap<TopicPartition, AffectedTierTopicPartitionInfo>() { // from class: kafka.tier.topic.TierTopicDataLossValidatorTest.3
            {
                put(tierTopicPartitionGroup.ttp, TierTopicDataLossValidatorTest.makeAffectedTierTopicPartitionInfo(userTopicPartitionGroup, false, new OffsetAndEpoch(-1L, Optional.of(-1))));
                put(tierTopicPartitionGroup2.ttp, TierTopicDataLossValidatorTest.makeAffectedTierTopicPartitionInfo(userTopicPartitionGroup4, true, new OffsetAndEpoch(4L, Optional.of(2))));
            }
        }, deserializeReport.affectedTierTopicPartitions());
        Assertions.assertFalse(deserializeReport.hasFailures());
        verifyMocks(confluentAdmin, hashSet, 2);
        if (validationSource == ValidationSource.UNCLEAN_RESTART_VALIDATION) {
            ((Producer) Mockito.verify(producer, Mockito.times(4))).send((ProducerRecord) ArgumentMatchers.any());
        }
    }

    @Test
    public void testDataLossDetectedAndUserPartitionsFenced() throws Exception {
        testSuccessfulDataLossDetectionLogic(ValidationSource.UNCLEAN_RESTART_VALIDATION);
    }

    @Test
    public void testDataLossDetectedButUserPartitionsNotFenced() throws Exception {
        testSuccessfulDataLossDetectionLogic(ValidationSource.ON_DEMAND_VALIDATION);
    }

    @Test
    public void testFailedDataLossDetectionLogicDueToTimeout() throws Exception {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        Mockito.when(Boolean.valueOf(logManager.hadCleanShutdown())).thenReturn(true);
        MockTime mockTime = new MockTime(10001L);
        long milliseconds = mockTime.milliseconds();
        Metrics metrics = new Metrics();
        UserTopicPartitionGroup userTopicPartitionGroup = new UserTopicPartitionGroup(TEST_TOPIC, 0, 2, 5, TierPartitionStatus.ONLINE, false, false);
        final UserTopicPartitionGroup userTopicPartitionGroup2 = new UserTopicPartitionGroup(TEST_TOPIC, 1, 3, 6, TierPartitionStatus.ONLINE, false, false);
        setupMockLogs(logManager, Arrays.asList(userTopicPartitionGroup.mockLog, userTopicPartitionGroup2.mockLog));
        TierTopic tierTopic = setupMockTierTopic();
        HashMap hashMap = new HashMap();
        TierTopicPartitionGroup tierTopicPartitionGroup = new TierTopicPartitionGroup(0, -1, userTopicPartitionGroup.lastMaterializedEpoch(), makePartitionState(userTopicPartitionGroup.lastMaterializedEpoch(), 6, makeBatch(0L, 0, 2), makeBatch(2L, 1, 2)), tierTopic, new HashSet(Arrays.asList(userTopicPartitionGroup.utp)));
        Mockito.when(tierTopicPartitionGroup.kafkaFutureForReplicaStatusPartitionResult.get()).thenReturn(new PartitionResult(-1, -1, new ArrayList())).thenThrow(new Throwable[]{new RuntimeException(String.format("Dummy exception for %s leader", tierTopicPartitionGroup.ttp))});
        final TierTopicPartitionGroup tierTopicPartitionGroup2 = new TierTopicPartitionGroup(1, 3, userTopicPartitionGroup2.lastMaterializedEpoch(), makePartitionState(userTopicPartitionGroup2.lastMaterializedEpoch(), 6, makeBatch(0L, 0, 1), makeBatch(1L, 1, 1), makeBatch(2L, 2, 2), makeBatch(4L, 4, 2)), tierTopic, new HashSet(Arrays.asList(userTopicPartitionGroup2.utp)));
        hashMap.put(3, tierTopicPartitionGroup2);
        PartitionResult partitionResult = new PartitionResult(3, userTopicPartitionGroup2.lastMaterializedEpoch(), new ArrayList());
        Mockito.when(tierTopicPartitionGroup2.kafkaFutureForReplicaStatusPartitionResult.get()).thenReturn(partitionResult).thenReturn(partitionResult);
        HashSet hashSet = new HashSet(Arrays.asList(tierTopicPartitionGroup, tierTopicPartitionGroup2));
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class);
        setupReplicaStatusMock(confluentAdmin, hashSet, 4);
        MockInMemoryTierObjectStore createMockObjectStore = createMockObjectStore(mockTime, metrics);
        TierTopicDataLossValidator newInstance = newInstance(createMockObjectStore, tierTopic, logManager, confluentAdmin, (i, i2, str) -> {
            return ((TierTopicPartitionGroup) hashMap.get(Integer.valueOf(i2))).mockLeaderEndpoint;
        }, Collections.singletonList(userTopicPartitionGroup2.mockPartition), mockTime, metrics, true, false);
        TierTopicHeadDataLossDetectionResponse detectDataLossInTierTopicHead = newInstance.detectDataLossInTierTopicHead(createRequest(), validationSource, (Producer) Mockito.mock(Producer.class), 20000L, 0L);
        checkMetrics(newInstance, validationSource, 1, 1, 1, 1, 1, 0);
        Assertions.assertEquals(1, detectDataLossInTierTopicHead.errorMessages().size());
        Assertions.assertEquals(TierTopicHeadDataLossDetectionResponse.CompletionStatus.FAILURE, detectDataLossInTierTopicHead.completionStatus());
        TierTopicHeadDataLossReport deserializeReport = deserializeReport(detectDataLossInTierTopicHead, createMockObjectStore);
        Assertions.assertEquals(TierTopicHeadDataLossReport.CompletionStatus.FAILURE, deserializeReport.completionStatus());
        Assertions.assertEquals((short) 1, deserializeReport.version());
        Assertions.assertEquals(0, deserializeReport.brokerId());
        Assertions.assertEquals(validationSource, deserializeReport.source());
        Assertions.assertTrue(deserializeReport.startTimestampMs() >= milliseconds);
        Assertions.assertTrue(deserializeReport.endTimestampMs() >= deserializeReport.startTimestampMs());
        Assertions.assertTrue(deserializeReport.hasDataLoss());
        Assertions.assertEquals(new HashMap<TopicIdPartition, AffectedUserTopicPartitionInfo>() { // from class: kafka.tier.topic.TierTopicDataLossValidatorTest.4
            {
                put(userTopicPartitionGroup2.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(tierTopicPartitionGroup2, userTopicPartitionGroup2, true));
            }
        }, deserializeReport.affectedUserTopicPartitions());
        Assertions.assertEquals(new HashMap<TopicPartition, AffectedTierTopicPartitionInfo>() { // from class: kafka.tier.topic.TierTopicDataLossValidatorTest.5
            {
                put(tierTopicPartitionGroup2.ttp, TierTopicDataLossValidatorTest.makeAffectedTierTopicPartitionInfo(userTopicPartitionGroup2, true, new OffsetAndEpoch(4L, Optional.of(2))));
            }
        }, deserializeReport.affectedTierTopicPartitions());
        Assertions.assertTrue(deserializeReport.hasFailures());
        Assertions.assertTrue(deserializeReport.userPartitionsWithFencingFailures().isEmpty());
        Assertions.assertEquals(new HashSet(Collections.singletonList(tierTopicPartitionGroup.ttp)), deserializeReport.failedTierTopicPartitions());
        Assertions.assertEquals(1, deserializeReport.errorMessages().size());
        Assertions.assertTrue(((String) deserializeReport.errorMessages().get(0)).contains("timed out"));
        verifyMocks(confluentAdmin, hashSet, 2);
    }

    @Test
    public void testDataLossDetectedAfterRetriesDuringFailures() throws Exception {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        Mockito.when(Boolean.valueOf(logManager.hadCleanShutdown())).thenReturn(true);
        MockTime mockTime = new MockTime();
        long milliseconds = mockTime.milliseconds();
        Metrics metrics = new Metrics();
        final UserTopicPartitionGroup userTopicPartitionGroup = new UserTopicPartitionGroup(TEST_TOPIC, 0, 2, 5, TierPartitionStatus.ONLINE, false, false);
        setupMockLogs(logManager, Arrays.asList(userTopicPartitionGroup.mockLog));
        TierTopic tierTopic = setupMockTierTopic();
        HashMap hashMap = new HashMap();
        final TierTopicPartitionGroup tierTopicPartitionGroup = new TierTopicPartitionGroup(0, 1, userTopicPartitionGroup.lastMaterializedEpoch(), makePartitionState(userTopicPartitionGroup.lastMaterializedEpoch(), 6, makeBatch(0L, 0, 2), makeBatch(2L, 1, 2)), tierTopic, new HashSet(Arrays.asList(userTopicPartitionGroup.utp)));
        hashMap.put(1, tierTopicPartitionGroup);
        PartitionResult partitionResult = new PartitionResult(-1, -1, new ArrayList());
        PartitionResult partitionResult2 = new PartitionResult(1, userTopicPartitionGroup.lastMaterializedEpoch(), new ArrayList());
        Mockito.when(tierTopicPartitionGroup.kafkaFutureForReplicaStatusPartitionResult.get()).thenReturn(partitionResult).thenThrow(new Throwable[]{new RuntimeException(String.format("Dummy exception for %s leader", tierTopicPartitionGroup.ttp))}).thenReturn(partitionResult2).thenReturn(partitionResult2);
        HashSet hashSet = new HashSet(Collections.singletonList(tierTopicPartitionGroup));
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class);
        setupReplicaStatusMock(confluentAdmin, hashSet, 4);
        MockInMemoryTierObjectStore createMockObjectStore = createMockObjectStore(mockTime, metrics);
        TierTopicDataLossValidator newInstance = newInstance(createMockObjectStore, tierTopic, logManager, confluentAdmin, (i, i2, str) -> {
            return ((TierTopicPartitionGroup) hashMap.get(Integer.valueOf(i2))).mockLeaderEndpoint;
        }, Collections.emptyList(), mockTime, metrics, true, false);
        TierTopicHeadDataLossDetectionResponse detectDataLossInTierTopicHead = newInstance.detectDataLossInTierTopicHead(createRequest(), validationSource, (Producer) Mockito.mock(Producer.class), 30000L, 0L);
        checkMetrics(newInstance, validationSource, 1, 0, 1, 1, 0, 0);
        Assertions.assertTrue(detectDataLossInTierTopicHead.errorMessages().isEmpty());
        Assertions.assertEquals(TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, detectDataLossInTierTopicHead.completionStatus());
        TierTopicHeadDataLossReport deserializeReport = deserializeReport(detectDataLossInTierTopicHead, createMockObjectStore);
        Assertions.assertEquals(TierTopicHeadDataLossReport.CompletionStatus.SUCCESS, deserializeReport.completionStatus());
        Assertions.assertEquals((short) 1, deserializeReport.version());
        Assertions.assertEquals(0, deserializeReport.brokerId());
        Assertions.assertEquals(validationSource, deserializeReport.source());
        Assertions.assertTrue(deserializeReport.startTimestampMs() >= milliseconds);
        Assertions.assertTrue(deserializeReport.endTimestampMs() >= deserializeReport.startTimestampMs());
        Assertions.assertEquals(new HashMap<TopicIdPartition, AffectedUserTopicPartitionInfo>() { // from class: kafka.tier.topic.TierTopicDataLossValidatorTest.6
            {
                put(userTopicPartitionGroup.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(tierTopicPartitionGroup, userTopicPartitionGroup, false));
            }
        }, deserializeReport.affectedUserTopicPartitions());
        Assertions.assertEquals(new HashMap<TopicPartition, AffectedTierTopicPartitionInfo>() { // from class: kafka.tier.topic.TierTopicDataLossValidatorTest.7
            {
                put(tierTopicPartitionGroup.ttp, TierTopicDataLossValidatorTest.makeAffectedTierTopicPartitionInfo(userTopicPartitionGroup, false, new OffsetAndEpoch(-1L, Optional.of(-1))));
            }
        }, deserializeReport.affectedTierTopicPartitions());
        Assertions.assertFalse(deserializeReport.hasFailures());
        verifyMocks(confluentAdmin, hashSet, 4);
    }

    @Test
    public void testNoDataLossDetected() throws Exception {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        Mockito.when(Boolean.valueOf(logManager.hadCleanShutdown())).thenReturn(true);
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics();
        UserTopicPartitionGroup userTopicPartitionGroup = new UserTopicPartitionGroup(TEST_TOPIC, 0, 1, 3, TierPartitionStatus.ONLINE, false, false);
        setupMockLogs(logManager, Collections.singletonList(userTopicPartitionGroup.mockLog));
        TierTopic tierTopic = setupMockTierTopic();
        HashMap hashMap = new HashMap();
        TierTopicPartitionGroup tierTopicPartitionGroup = new TierTopicPartitionGroup(0, 1, userTopicPartitionGroup.lastMaterializedEpoch(), makePartitionState(userTopicPartitionGroup.lastMaterializedEpoch(), 6, makeBatch(0L, 0, 2), makeBatch(2L, 1, 2), makeBatch(4L, 2, 5)), tierTopic, new HashSet(Arrays.asList(userTopicPartitionGroup.utp)));
        hashMap.put(1, tierTopicPartitionGroup);
        HashSet hashSet = new HashSet(Arrays.asList(tierTopicPartitionGroup));
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class);
        setupReplicaStatusMock(confluentAdmin, hashSet, 2);
        TierTopicDataLossValidator newInstance = newInstance((TierObjectStore) Mockito.mock(TierObjectStore.class), tierTopic, logManager, confluentAdmin, (i, i2, str) -> {
            return ((TierTopicPartitionGroup) hashMap.get(Integer.valueOf(i2))).mockLeaderEndpoint;
        }, Collections.emptyList(), mockTime, metrics, true, true);
        TierTopicHeadDataLossDetectionResponse detectDataLossInTierTopicHead = newInstance.detectDataLossInTierTopicHead(createRequest(), validationSource, (Producer) Mockito.mock(KafkaProducer.class), 30000L, 0L);
        checkZeroMetrics(newInstance, validationSource);
        Assertions.assertTrue(detectDataLossInTierTopicHead.dataLossReportPath().isEmpty());
        Assertions.assertEquals(TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, detectDataLossInTierTopicHead.completionStatus());
        Assertions.assertTrue(detectDataLossInTierTopicHead.errorMessages().isEmpty());
        verifyMocks(confluentAdmin, hashSet, 2);
    }

    @Test
    public void testDataLossDetectedButObjectStoreUploadFailed() throws Exception {
        ValidationSource validationSource = ValidationSource.UNCLEAN_RESTART_VALIDATION;
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        Mockito.when(Boolean.valueOf(logManager.hadCleanShutdown())).thenReturn(false);
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics();
        UserTopicPartitionGroup userTopicPartitionGroup = new UserTopicPartitionGroup(TEST_TOPIC, 0, 2, 5, TierPartitionStatus.ONLINE, false, false);
        setupMockLogs(logManager, Collections.singletonList(userTopicPartitionGroup.mockLog));
        TierTopic tierTopic = setupMockTierTopic();
        HashMap hashMap = new HashMap();
        TierTopicPartitionGroup tierTopicPartitionGroup = new TierTopicPartitionGroup(0, 1, userTopicPartitionGroup.lastMaterializedEpoch(), makePartitionState(userTopicPartitionGroup.lastMaterializedEpoch(), 6, makeBatch(0L, 0, 2), makeBatch(2L, 1, 2)), tierTopic, new HashSet(Arrays.asList(userTopicPartitionGroup.utp)));
        hashMap.put(1, tierTopicPartitionGroup);
        HashSet hashSet = new HashSet(Arrays.asList(tierTopicPartitionGroup));
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class);
        setupReplicaStatusMock(confluentAdmin, hashSet, 2);
        TierObjectStore tierObjectStore = (TierObjectStore) Mockito.mock(TierObjectStore.class);
        Mockito.when(tierObjectStore.putBuffer((ObjectStoreMetadata) ArgumentMatchers.any(), (ByteBuffer) ArgumentMatchers.any(), (ObjectType) ArgumentMatchers.any())).thenThrow(new Throwable[]{new TierObjectStoreFatalException("Dummy fatal object store exception")});
        TierTopicDataLossValidator newInstance = newInstance(tierObjectStore, tierTopic, logManager, confluentAdmin, (i, i2, str) -> {
            return ((TierTopicPartitionGroup) hashMap.get(Integer.valueOf(i2))).mockLeaderEndpoint;
        }, Collections.emptyList(), mockTime, metrics, true, false);
        Producer producer = (Producer) Mockito.mock(KafkaProducer.class);
        TierTopicHeadDataLossDetectionRequest createRequest = createRequest();
        Assertions.assertThrows(RuntimeException.class, () -> {
            newInstance.detectDataLossInTierTopicHead(createRequest, validationSource, producer, 30000L, 0L);
        });
        checkMetrics(newInstance, validationSource, 1, 0, 1, 1, 0, 1);
        verifyMocks(confluentAdmin, hashSet, 2);
    }
}
