/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.topic;

import io.confluent.rest.TierTopicHeadDataLossDetectionRequest;
import io.confluent.rest.TierTopicHeadDataLossDetectionResponse;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.LogManager;
import kafka.log.MergedLog;
import kafka.server.Defaults;
import kafka.server.LeaderEndpointSupplier;
import kafka.server.MockLeaderEndPoint;
import kafka.server.PartitionState;
import kafka.server.ReplicaManager;
import kafka.tier.TopicIdPartition;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionStatus;
import kafka.tier.store.MockInMemoryTierObjectStore;
import kafka.tier.store.MockInMemoryTierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.ObjectType;
import kafka.tier.store.objects.metadata.ObjectStoreMetadata;
import kafka.tier.store.objects.metadata.TierTopicHeadDataLossReportMetadata;
import kafka.tier.topic.TierTopic;
import kafka.tier.topic.TierTopicDataLossValidator;
import kafka.tier.topic.TierTopicDataLossValidatorMetrics;
import kafka.tier.topic.TierTopicManagerConfig;
import kafka.tier.topic.recovery.AffectedTierTopicPartitionInfo;
import kafka.tier.topic.recovery.AffectedUserTopicPartitionInfo;
import kafka.tier.topic.recovery.TierTopicHeadDataLossReport;
import kafka.tier.topic.recovery.ValidationSource;
import org.apache.directory.api.util.IOUtils;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.PartitionResult;
import org.apache.kafka.clients.admin.ReplicaStatusOptions;
import org.apache.kafka.clients.admin.ReplicaStatusResult;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import org.mockito.verification.VerificationMode;
import scala.Option;
import scala.collection.immutable.Seq;
import scala.jdk.javaapi.CollectionConverters;

public class TierTopicDataLossValidatorTest {
    private static final int BROKER_ID = 0;
    private static final String CLUSTER_ID = "cluster-0";
    private static final String TEST_TOPIC = "test_topic";

    private TierTopicDataLossValidator newInstance(TierObjectStore mockObjectStore, TierTopic mockTierTopic, LogManager mockLogManager, ConfluentAdmin mockClient, LeaderEndpointSupplier mockLeaderEndpointSupplier, List<Partition> leaderPartitions, Time mockTime, Metrics metrics, boolean enableTierTopicDataLossDetection, boolean enableTierTopicFencingDuringDataLoss) {
        TierTopicManagerConfig config = new TierTopicManagerConfig(() -> Collections.singletonMap("bootstrap.servers", "bootstrap"), "", Defaults.TierMetadataNumPartitions(), Defaults.TierMetadataReplicationFactor(), 0, CLUSTER_ID, Long.valueOf(5L), Integer.valueOf(30000), Integer.valueOf(500), Long.valueOf(-1L), new ArrayList(), Boolean.valueOf(Defaults.TierTopicProducerEnableIdempotence()), Boolean.valueOf(enableTierTopicDataLossDetection), 0x6DDD00L, Boolean.valueOf(enableTierTopicFencingDuringDataLoss), Boolean.valueOf(Defaults.TierTopicMaterializationFromSnapshotEnable()));
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)mockReplicaManager.leaderPartitionsIterator()).thenReturn((Object)CollectionConverters.asScala(leaderPartitions).iterator());
        Mockito.when((Object)mockReplicaManager.logManager()).thenReturn((Object)mockLogManager);
        return new TierTopicDataLossValidator(config, mockTierTopic, mockObjectStore, mockReplicaManager, () -> mockClient, mockLeaderEndpointSupplier, mockTime, metrics);
    }

    private static TierTopicHeadDataLossDetectionRequest createRequest() {
        return TierTopicDataLossValidatorTest.createRequestWithAllowList(new HashSet<TopicPartition>());
    }

    private static TierTopicHeadDataLossDetectionRequest createRequestWithAllowList(Set<TopicPartition> allowList) {
        return new TierTopicHeadDataLossDetectionRequest(UUID.randomUUID().toString(), allowList);
    }

    private static MockInMemoryTierObjectStore createMockObjectStore(Time time, Metrics metrics) {
        return new MockInMemoryTierObjectStore(time, metrics, new MockInMemoryTierObjectStoreConfig(CLUSTER_ID, Integer.valueOf(0)));
    }

    @Test
    public void testThrowOnDataLossDetectionDisabled() {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        LogManager mockLogManager = (LogManager)Mockito.mock(LogManager.class);
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics();
        TierTopicDataLossValidator validator = this.newInstance((TierObjectStore)Mockito.mock(TierObjectStore.class), (TierTopic)Mockito.mock(TierTopic.class), mockLogManager, (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class), (LeaderEndpointSupplier)Mockito.mock(LeaderEndpointSupplier.class), new ArrayList<Partition>(), (Time)mockTime, metrics, false, true);
        TierTopicHeadDataLossDetectionRequest request = TierTopicDataLossValidatorTest.createRequest();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> validator.detectDataLossInTierTopicHead(request, validationSource, null, 30000L, 0L));
        TierTopicDataLossValidatorTest.checkZeroMetrics(validator, validationSource);
    }

    @Test
    public void testThrowOnCleanShutdownForUncleanValidationSource() {
        ValidationSource validationSource = ValidationSource.UNCLEAN_RESTART_VALIDATION;
        LogManager mockLogManager = (LogManager)Mockito.mock(LogManager.class);
        Mockito.when((Object)mockLogManager.hadCleanShutdown()).thenReturn((Object)true);
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics();
        TierTopicDataLossValidator validator = this.newInstance((TierObjectStore)Mockito.mock(TierObjectStore.class), (TierTopic)Mockito.mock(TierTopic.class), mockLogManager, (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class), (LeaderEndpointSupplier)Mockito.mock(LeaderEndpointSupplier.class), new ArrayList<Partition>(), (Time)mockTime, metrics, true, true);
        TierTopicHeadDataLossDetectionRequest request = TierTopicDataLossValidatorTest.createRequest();
        Assertions.assertThrows(UnsupportedOperationException.class, () -> validator.detectDataLossInTierTopicHead(request, validationSource, null, 30000L, 0L));
        TierTopicDataLossValidatorTest.checkZeroMetrics(validator, validationSource);
    }

    @Test
    public void testDataLossDetectionCompletesWithoutEligibleUserPartitionReplicas() throws Exception {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        LogManager mockLogManager = (LogManager)Mockito.mock(LogManager.class);
        Mockito.when((Object)mockLogManager.hadCleanShutdown()).thenReturn((Object)true);
        Mockito.when((Object)mockLogManager.allLogs()).thenReturn((Object)CollectionConverters.asScala(new ArrayList()));
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics();
        TierTopicDataLossValidator validator = this.newInstance((TierObjectStore)Mockito.mock(TierObjectStore.class), (TierTopic)Mockito.mock(TierTopic.class), mockLogManager, (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class), (LeaderEndpointSupplier)Mockito.mock(LeaderEndpointSupplier.class), new ArrayList<Partition>(), (Time)mockTime, metrics, true, true);
        TierTopicHeadDataLossDetectionRequest request = TierTopicDataLossValidatorTest.createRequest();
        TierTopicDataLossValidatorMetrics.DataLossDetectionMetricGroup metricGroup = validator.dataLossMetrics.getMetricGroupOrThrow(validationSource);
        metricGroup.recordDataLossReport(true, true, 2, 1, 1, 1, 1);
        metricGroup.recordDataLossReportUploadStatus(false);
        TierTopicHeadDataLossDetectionResponse response = validator.detectDataLossInTierTopicHead(request, validationSource, null, 30000L, 0L);
        Assertions.assertTrue((boolean)response.dataLossReportPath().isEmpty());
        Assertions.assertEquals((Object)TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, (Object)response.completionStatus());
        Assertions.assertTrue((boolean)response.errorMessages().isEmpty());
        TierTopicDataLossValidatorTest.checkZeroMetrics(validator, validationSource);
    }

    private static PartitionState makePartitionState(int leaderEpoch, int hwm, List<RecordBatch> ... listOfRecordBatches) {
        ArrayList<RecordBatch> records = new ArrayList<RecordBatch>();
        for (List<RecordBatch> eachList : listOfRecordBatches) {
            records.addAll(eachList);
        }
        return PartitionState.apply((Seq<RecordBatch>)CollectionConverters.asScala(records).toSeq(), leaderEpoch, hwm, false);
    }

    private static List<RecordBatch> makeBatch(long baseOffset, int leaderEpoch, int count) {
        ArrayList<SimpleRecord> records = new ArrayList<SimpleRecord>();
        for (int index = 0; index < count; ++index) {
            records.add(new SimpleRecord(UUID.randomUUID().toString().getBytes()));
        }
        ArrayList<RecordBatch> appendedRecords = new ArrayList<RecordBatch>();
        MemoryRecords.withRecords((long)baseOffset, (CompressionType)CompressionType.NONE, (Integer)leaderEpoch, (SimpleRecord[])records.toArray(new SimpleRecord[0])).batches().iterator().forEachRemaining(appendedRecords::add);
        return appendedRecords;
    }

    private static void setupMockLogs(LogManager mockLogManager, List<MergedLog> mockLogs) {
        Mockito.when((Object)mockLogManager.allLogs()).thenReturn((Object)CollectionConverters.asScala(new HashSet<MergedLog>(mockLogs)));
    }

    private static TierTopic setupMockTierTopic() {
        TierTopic mockTierTopic = (TierTopic)Mockito.mock(TierTopic.class);
        Mockito.when((Object)mockTierTopic.topicName()).thenReturn((Object)"_confluent-tier-state");
        Mockito.when((Object)mockTierTopic.numPartitions()).thenReturn((Object)OptionalInt.of(Defaults.TierMetadataNumPartitions()));
        return mockTierTopic;
    }

    private static void checkMetrics(TierTopicDataLossValidator validator, ValidationSource source, int dataLossDetectedMetric, int dataLossDetectionFailureMetric, int affectedTierTopicPartitionCountMetric, int affectedUserPartitionReplicaCountMetric, int affectedUserPartitionLeaderCountMetric, int dataLossReportUploadFailureMetric) {
        TierTopicDataLossValidatorMetrics metrics = validator.dataLossMetrics;
        TierTopicDataLossValidatorMetrics.DataLossDetectionMetricGroup metricGroup = metrics.getMetricGroupOrThrow(source);
        Assertions.assertEquals((int)dataLossDetectedMetric, (int)metricGroup.dataLossDetectedMetric.value());
        Assertions.assertEquals((int)dataLossDetectionFailureMetric, (int)metricGroup.dataLossDetectionFailureMetric.value());
        Assertions.assertEquals((int)affectedTierTopicPartitionCountMetric, (int)metricGroup.affectedTierTopicPartitionCountMetric.value());
        Assertions.assertEquals((int)affectedUserPartitionReplicaCountMetric, (int)metricGroup.affectedUserPartitionReplicaCountMetric.value());
        Assertions.assertEquals((int)affectedUserPartitionLeaderCountMetric, (int)metricGroup.affectedUserPartitionLeaderCountMetric.value());
        Assertions.assertEquals((int)dataLossReportUploadFailureMetric, (int)metricGroup.dataLossReportUploadFailureMetric.value());
    }

    private static void checkZeroMetrics(TierTopicDataLossValidator validator, ValidationSource source) {
        TierTopicDataLossValidatorTest.checkMetrics(validator, source, 0, 0, 0, 0, 0, 0);
    }

    private static AffectedUserTopicPartitionInfo makeAffectedUserTopicPartitionInfo(TierTopicPartitionGroup ttp, UserTopicPartitionGroup utp, boolean isLeader) {
        return new AffectedUserTopicPartitionInfo(ttp.ttp.partition(), utp.ftpsLastMaterializedOffsetEpoch, utp.ftpsStatus, isLeader);
    }

    private static AffectedTierTopicPartitionInfo makeAffectedTierTopicPartitionInfo(UserTopicPartitionGroup utp, boolean isLeader, OffsetAndEpoch tierTopicEndOffsetAndEpoch) {
        return new AffectedTierTopicPartitionInfo(utp.ftpsLastMaterializedOffsetEpoch, utp.utp, utp.ftpsStatus, isLeader, 0, tierTopicEndOffsetAndEpoch);
    }

    private static void setupReplicaStatusMock(ConfluentAdmin mockAdminClient, final Set<TierTopicPartitionGroup> allTtpGroups, int count) {
        ReplicaStatusResult result = new ReplicaStatusResult((Map)new HashMap<TopicPartition, KafkaFuture<PartitionResult>>(){
            {
                for (TierTopicPartitionGroup ttp : allTtpGroups) {
                    this.put(ttp.ttp, ttp.kafkaFutureForReplicaStatusPartitionResult);
                }
            }
        });
        OngoingStubbing stub = Mockito.when((Object)mockAdminClient.replicaStatus((Set)ArgumentMatchers.any(), (ReplicaStatusOptions)ArgumentMatchers.any())).thenReturn((Object)result);
        while (count > 1) {
            stub = stub.thenReturn((Object)result);
            --count;
        }
    }

    private static TierTopicHeadDataLossReport deserializeReport(TierTopicHeadDataLossDetectionResponse response, TierObjectStore mockObjectStore) throws IOException {
        String reportPath = response.dataLossReportPath();
        Assertions.assertFalse((boolean)reportPath.isEmpty());
        TierTopicHeadDataLossReportMetadata reportMetadata = TierTopicHeadDataLossReportMetadata.fromPath((String)reportPath);
        TierObjectStoreResponse objStoreResponse = mockObjectStore.getObjectStoreFragment((ObjectStoreMetadata)reportMetadata, FragmentType.TIER_TOPIC_HEAD_DATA_LOSS_REPORT);
        objStoreResponse.getInputStream();
        String reportJson = IOUtils.toString((InputStream)objStoreResponse.getInputStream(), (Charset)Charset.defaultCharset());
        return TierTopicHeadDataLossReport.readJsonFromString((String)reportJson);
    }

    private static void verifyMocks(ConfluentAdmin mockAdminClient, Set<TierTopicPartitionGroup> ttpGroups, int wantedNumberOfInvocations) {
        ((ConfluentAdmin)Mockito.verify((Object)mockAdminClient, (VerificationMode)Mockito.times((int)wantedNumberOfInvocations))).replicaStatus((Set)ArgumentMatchers.any(), (ReplicaStatusOptions)ArgumentMatchers.any());
        ttpGroups.forEach(ttp -> {
            try {
                ((KafkaFuture)Mockito.verify(ttp.kafkaFutureForReplicaStatusPartitionResult, (VerificationMode)Mockito.times((int)wantedNumberOfInvocations))).get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void testSuccessfulDataLossDetectionLogic(ValidationSource validationSource) throws Exception {
        LogManager mockLogManager = (LogManager)Mockito.mock(LogManager.class);
        Mockito.when((Object)mockLogManager.hadCleanShutdown()).thenReturn((Object)(validationSource != ValidationSource.UNCLEAN_RESTART_VALIDATION ? 1 : 0));
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics();
        final UserTopicPartitionGroup utp0 = new UserTopicPartitionGroup(TEST_TOPIC, 0, 2, 5, TierPartitionStatus.ONLINE, false, false);
        final UserTopicPartitionGroup utp1 = new UserTopicPartitionGroup(TEST_TOPIC, 1, 1, 2, TierPartitionStatus.ONLINE, false, false);
        UserTopicPartitionGroup deletedUtp2 = new UserTopicPartitionGroup(TEST_TOPIC, 2, 12, 15, TierPartitionStatus.ONLINE, true, false);
        final UserTopicPartitionGroup utp3 = new UserTopicPartitionGroup(TEST_TOPIC, 3, 3, 6, TierPartitionStatus.ERROR, false, false);
        final UserTopicPartitionGroup utp4 = new UserTopicPartitionGroup(TEST_TOPIC, 4, 3, 4, TierPartitionStatus.ONLINE, false, false);
        UserTopicPartitionGroup strayUtp5 = new UserTopicPartitionGroup(TEST_TOPIC, 5, 10, 25, TierPartitionStatus.ONLINE, false, true);
        UserTopicPartitionGroup utp6 = new UserTopicPartitionGroup(TEST_TOPIC, 6, 3, 5, TierPartitionStatus.ONLINE, false, false);
        UserTopicPartitionGroup missingEpochUtp7 = new UserTopicPartitionGroup(TEST_TOPIC, 7, -1, 10, TierPartitionStatus.ONLINE, false, false);
        UserTopicPartitionGroup missingOffsetUtp8 = new UserTopicPartitionGroup(TEST_TOPIC, 8, 3, -1, TierPartitionStatus.ONLINE, false, false);
        TierTopicDataLossValidatorTest.setupMockLogs(mockLogManager, Arrays.asList(utp0.mockLog, utp1.mockLog, deletedUtp2.mockLog, utp3.mockLog, utp4.mockLog, strayUtp5.mockLog, utp6.mockLog, missingEpochUtp7.mockLog, missingOffsetUtp8.mockLog));
        TierTopic mockTierTopic = TierTopicDataLossValidatorTest.setupMockTierTopic();
        HashMap<Integer, TierTopicPartitionGroup> leaderToTtpGroup = new HashMap<Integer, TierTopicPartitionGroup>();
        HashSet<TopicIdPartition> utpsForTtp0 = new HashSet<TopicIdPartition>(Arrays.asList(utp0.utp, utp1.utp));
        PartitionState leaderStateTtp0 = TierTopicDataLossValidatorTest.makePartitionState(utp0.lastMaterializedEpoch(), 6, TierTopicDataLossValidatorTest.makeBatch(0L, 0, 2), TierTopicDataLossValidatorTest.makeBatch(2L, 1, 2));
        final TierTopicPartitionGroup ttp0 = new TierTopicPartitionGroup(0, 1, utp0.lastMaterializedEpoch(), leaderStateTtp0, mockTierTopic, utpsForTtp0);
        leaderToTtpGroup.put(1, ttp0);
        PartitionState leaderStateTtp1 = TierTopicDataLossValidatorTest.makePartitionState(utp3.lastMaterializedEpoch(), 6, TierTopicDataLossValidatorTest.makeBatch(0L, 0, 1), TierTopicDataLossValidatorTest.makeBatch(1L, 1, 1), TierTopicDataLossValidatorTest.makeBatch(2L, 2, 2), TierTopicDataLossValidatorTest.makeBatch(4L, 4, 2));
        HashSet<TopicIdPartition> utpsForTtp1 = new HashSet<TopicIdPartition>(Arrays.asList(deletedUtp2.utp, utp3.utp, utp4.utp));
        final TierTopicPartitionGroup ttp1 = new TierTopicPartitionGroup(1, 3, utp3.lastMaterializedEpoch(), leaderStateTtp1, mockTierTopic, utpsForTtp1);
        leaderToTtpGroup.put(3, ttp1);
        PartitionState leaderStateTtp2 = TierTopicDataLossValidatorTest.makePartitionState(utp6.lastMaterializedEpoch(), 8, TierTopicDataLossValidatorTest.makeBatch(0L, 0, 1), TierTopicDataLossValidatorTest.makeBatch(1L, 1, 1), TierTopicDataLossValidatorTest.makeBatch(2L, 2, 2), TierTopicDataLossValidatorTest.makeBatch(4L, 3, 2), TierTopicDataLossValidatorTest.makeBatch(6L, 4, 2));
        HashSet<TopicIdPartition> utpsForTtp2 = new HashSet<TopicIdPartition>(Arrays.asList(strayUtp5.utp, utp6.utp, missingEpochUtp7.utp, missingOffsetUtp8.utp));
        TierTopicPartitionGroup ttp2 = new TierTopicPartitionGroup(2, 5, utp6.lastMaterializedEpoch(), leaderStateTtp2, mockTierTopic, utpsForTtp2);
        leaderToTtpGroup.put(5, ttp2);
        HashSet<TierTopicPartitionGroup> allTtpGroups = new HashSet<TierTopicPartitionGroup>(Arrays.asList(ttp0, ttp1, ttp2));
        ConfluentAdmin mockAdminClient = (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class);
        TierTopicDataLossValidatorTest.setupReplicaStatusMock(mockAdminClient, allTtpGroups, 2);
        MockInMemoryTierObjectStore mockObjectStore = TierTopicDataLossValidatorTest.createMockObjectStore((Time)mockTime, metrics);
        TierTopicDataLossValidator validator = this.newInstance((TierObjectStore)mockObjectStore, mockTierTopic, mockLogManager, mockAdminClient, (nodeId, leaderId, context) -> ((TierTopicPartitionGroup)leaderToTtpGroup.get((Object)Integer.valueOf((int)leaderId))).mockLeaderEndpoint, Collections.singletonList(utp3.mockPartition), (Time)mockTime, metrics, true, validationSource == ValidationSource.UNCLEAN_RESTART_VALIDATION);
        Producer producer = null;
        if (validationSource == ValidationSource.UNCLEAN_RESTART_VALIDATION) {
            producer = (Producer)Mockito.mock(KafkaProducer.class);
            Future recordMetadataFuture0 = (Future)Mockito.mock(Future.class);
            RecordMetadata recordMetadata0 = new RecordMetadata(ttp0.ttp, 4L, 0, -1L, -1, -1);
            Future recordMetadataFuture1 = (Future)Mockito.mock(Future.class);
            RecordMetadata recordMetadata1 = new RecordMetadata(ttp1.ttp, 6L, 0, -1L, -1, -1);
            Future recordMetadataFuture2 = (Future)Mockito.mock(Future.class);
            RecordMetadata recordMetadata2 = new RecordMetadata(ttp1.ttp, 7L, 0, -1L, -1, -1);
            Mockito.when((Object)producer.send((ProducerRecord)ArgumentMatchers.any())).thenReturn((Object)recordMetadataFuture0).thenReturn((Object)recordMetadataFuture1).thenReturn((Object)recordMetadataFuture2);
            Mockito.when(recordMetadataFuture0.get()).thenReturn((Object)recordMetadata0);
            Mockito.when(recordMetadataFuture1.get()).thenReturn((Object)recordMetadata1);
            Mockito.when(recordMetadataFuture2.get()).thenReturn((Object)recordMetadata2);
        }
        TierTopicHeadDataLossDetectionRequest request = TierTopicDataLossValidatorTest.createRequest();
        TierTopicHeadDataLossDetectionResponse response = validator.detectDataLossInTierTopicHead(request, validationSource, producer, 30000L, 0L);
        TierTopicDataLossValidatorTest.checkMetrics(validator, validationSource, 1, 0, 2, 4, 1, 0);
        Assertions.assertTrue((boolean)response.errorMessages().isEmpty());
        Assertions.assertEquals((Object)TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, (Object)response.completionStatus());
        TierTopicHeadDataLossReport report = TierTopicDataLossValidatorTest.deserializeReport(response, (TierObjectStore)mockObjectStore);
        Assertions.assertEquals((Object)TierTopicHeadDataLossReport.CompletionStatus.SUCCESS, (Object)report.completionStatus());
        Assertions.assertEquals((short)1, (short)report.version());
        Assertions.assertEquals((int)0, (int)report.brokerId());
        Assertions.assertEquals((Object)validationSource, (Object)report.source());
        Assertions.assertTrue((report.startTimestampMs() >= mockTime.milliseconds() ? 1 : 0) != 0);
        Assertions.assertTrue((report.endTimestampMs() >= report.startTimestampMs() ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)report.hasDataLoss());
        HashMap<TopicIdPartition, AffectedUserTopicPartitionInfo> expectedAffectedUtp = new HashMap<TopicIdPartition, AffectedUserTopicPartitionInfo>(){
            {
                this.put(utp0.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(ttp0, utp0, false));
                this.put(utp1.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(ttp0, utp1, false));
                this.put(utp3.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(ttp1, utp3, true));
                this.put(utp4.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(ttp1, utp4, false));
            }
        };
        Assertions.assertEquals((Object)expectedAffectedUtp, (Object)report.affectedUserTopicPartitions());
        HashMap<TopicPartition, AffectedTierTopicPartitionInfo> expectedAffectedTtp = new HashMap<TopicPartition, AffectedTierTopicPartitionInfo>(){
            {
                this.put(ttp0.ttp, TierTopicDataLossValidatorTest.makeAffectedTierTopicPartitionInfo(utp0, false, new OffsetAndEpoch(-1L, Optional.of(-1))));
                this.put(ttp1.ttp, TierTopicDataLossValidatorTest.makeAffectedTierTopicPartitionInfo(utp3, true, new OffsetAndEpoch(4L, Optional.of(2))));
            }
        };
        Assertions.assertEquals((Object)expectedAffectedTtp, (Object)report.affectedTierTopicPartitions());
        Assertions.assertFalse((boolean)report.hasFailures());
        TierTopicDataLossValidatorTest.verifyMocks(mockAdminClient, allTtpGroups, 2);
        if (validationSource == ValidationSource.UNCLEAN_RESTART_VALIDATION) {
            ((Producer)Mockito.verify((Object)producer, (VerificationMode)Mockito.times((int)4))).send((ProducerRecord)ArgumentMatchers.any());
        }
    }

    @Test
    public void testDataLossDetectedAndUserPartitionsFenced() throws Exception {
        this.testSuccessfulDataLossDetectionLogic(ValidationSource.UNCLEAN_RESTART_VALIDATION);
    }

    @Test
    public void testDataLossDetectedButUserPartitionsNotFenced() throws Exception {
        this.testSuccessfulDataLossDetectionLogic(ValidationSource.ON_DEMAND_VALIDATION);
    }

    @Test
    public void testFailedDataLossDetectionLogicDueToTimeout() throws Exception {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        long requestTimeoutMs = 20000L;
        LogManager mockLogManager = (LogManager)Mockito.mock(LogManager.class);
        Mockito.when((Object)mockLogManager.hadCleanShutdown()).thenReturn((Object)true);
        MockTime mockTime = new MockTime(10001L);
        long startTimeMs = mockTime.milliseconds();
        Metrics metrics = new Metrics();
        UserTopicPartitionGroup utp0 = new UserTopicPartitionGroup(TEST_TOPIC, 0, 2, 5, TierPartitionStatus.ONLINE, false, false);
        final UserTopicPartitionGroup utp1 = new UserTopicPartitionGroup(TEST_TOPIC, 1, 3, 6, TierPartitionStatus.ONLINE, false, false);
        TierTopicDataLossValidatorTest.setupMockLogs(mockLogManager, Arrays.asList(utp0.mockLog, utp1.mockLog));
        TierTopic mockTierTopic = TierTopicDataLossValidatorTest.setupMockTierTopic();
        HashMap<Integer, TierTopicPartitionGroup> leaderToTtpGroup = new HashMap<Integer, TierTopicPartitionGroup>();
        HashSet<TopicIdPartition> utpsForTtp0 = new HashSet<TopicIdPartition>(Arrays.asList(utp0.utp));
        PartitionState leaderStateForTtp0 = TierTopicDataLossValidatorTest.makePartitionState(utp0.lastMaterializedEpoch(), 6, TierTopicDataLossValidatorTest.makeBatch(0L, 0, 2), TierTopicDataLossValidatorTest.makeBatch(2L, 1, 2));
        TierTopicPartitionGroup leaderLessTtp0 = new TierTopicPartitionGroup(0, -1, utp0.lastMaterializedEpoch(), leaderStateForTtp0, mockTierTopic, utpsForTtp0);
        PartitionResult noLeaderPartitionResult = new PartitionResult(-1, -1, new ArrayList());
        Mockito.when((Object)leaderLessTtp0.kafkaFutureForReplicaStatusPartitionResult.get()).thenReturn((Object)noLeaderPartitionResult).thenThrow(new Throwable[]{new RuntimeException(String.format("Dummy exception for %s leader", leaderLessTtp0.ttp))});
        PartitionState leaderStateTtp1 = TierTopicDataLossValidatorTest.makePartitionState(utp1.lastMaterializedEpoch(), 6, TierTopicDataLossValidatorTest.makeBatch(0L, 0, 1), TierTopicDataLossValidatorTest.makeBatch(1L, 1, 1), TierTopicDataLossValidatorTest.makeBatch(2L, 2, 2), TierTopicDataLossValidatorTest.makeBatch(4L, 4, 2));
        HashSet<TopicIdPartition> utpsForTtp1 = new HashSet<TopicIdPartition>(Arrays.asList(utp1.utp));
        int leaderForTtp1 = 3;
        final TierTopicPartitionGroup ttp1 = new TierTopicPartitionGroup(1, leaderForTtp1, utp1.lastMaterializedEpoch(), leaderStateTtp1, mockTierTopic, utpsForTtp1);
        leaderToTtpGroup.put(leaderForTtp1, ttp1);
        PartitionResult withLeaderPartitionResult = new PartitionResult(leaderForTtp1, utp1.lastMaterializedEpoch(), new ArrayList());
        Mockito.when((Object)ttp1.kafkaFutureForReplicaStatusPartitionResult.get()).thenReturn((Object)withLeaderPartitionResult).thenReturn((Object)withLeaderPartitionResult);
        HashSet<TierTopicPartitionGroup> allTtpGroups = new HashSet<TierTopicPartitionGroup>(Arrays.asList(leaderLessTtp0, ttp1));
        ConfluentAdmin mockAdminClient = (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class);
        TierTopicDataLossValidatorTest.setupReplicaStatusMock(mockAdminClient, allTtpGroups, 4);
        MockInMemoryTierObjectStore mockObjectStore = TierTopicDataLossValidatorTest.createMockObjectStore((Time)mockTime, metrics);
        TierTopicDataLossValidator validator = this.newInstance((TierObjectStore)mockObjectStore, mockTierTopic, mockLogManager, mockAdminClient, (nodeId, leaderId, context) -> ((TierTopicPartitionGroup)leaderToTtpGroup.get((Object)Integer.valueOf((int)leaderId))).mockLeaderEndpoint, Collections.singletonList(utp1.mockPartition), (Time)mockTime, metrics, true, false);
        Producer producer = (Producer)Mockito.mock(Producer.class);
        TierTopicHeadDataLossDetectionRequest request = TierTopicDataLossValidatorTest.createRequest();
        TierTopicHeadDataLossDetectionResponse response = validator.detectDataLossInTierTopicHead(request, validationSource, producer, requestTimeoutMs, 0L);
        TierTopicDataLossValidatorTest.checkMetrics(validator, validationSource, 1, 1, 1, 1, 1, 0);
        Assertions.assertEquals((int)1, (int)response.errorMessages().size());
        Assertions.assertEquals((Object)TierTopicHeadDataLossDetectionResponse.CompletionStatus.FAILURE, (Object)response.completionStatus());
        TierTopicHeadDataLossReport report = TierTopicDataLossValidatorTest.deserializeReport(response, (TierObjectStore)mockObjectStore);
        Assertions.assertEquals((Object)TierTopicHeadDataLossReport.CompletionStatus.FAILURE, (Object)report.completionStatus());
        Assertions.assertEquals((short)1, (short)report.version());
        Assertions.assertEquals((int)0, (int)report.brokerId());
        Assertions.assertEquals((Object)validationSource, (Object)report.source());
        Assertions.assertTrue((report.startTimestampMs() >= startTimeMs ? 1 : 0) != 0);
        Assertions.assertTrue((report.endTimestampMs() >= report.startTimestampMs() ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)report.hasDataLoss());
        HashMap<TopicIdPartition, AffectedUserTopicPartitionInfo> expectedAffectedUtp = new HashMap<TopicIdPartition, AffectedUserTopicPartitionInfo>(){
            {
                this.put(utp1.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(ttp1, utp1, true));
            }
        };
        Assertions.assertEquals((Object)expectedAffectedUtp, (Object)report.affectedUserTopicPartitions());
        HashMap<TopicPartition, AffectedTierTopicPartitionInfo> expectedAffectedTtp = new HashMap<TopicPartition, AffectedTierTopicPartitionInfo>(){
            {
                this.put(ttp1.ttp, TierTopicDataLossValidatorTest.makeAffectedTierTopicPartitionInfo(utp1, true, new OffsetAndEpoch(4L, Optional.of(2))));
            }
        };
        Assertions.assertEquals((Object)expectedAffectedTtp, (Object)report.affectedTierTopicPartitions());
        Assertions.assertTrue((boolean)report.hasFailures());
        Assertions.assertTrue((boolean)report.userPartitionsWithFencingFailures().isEmpty());
        Assertions.assertEquals(new HashSet<TopicPartition>(Collections.singletonList(leaderLessTtp0.ttp)), (Object)report.failedTierTopicPartitions());
        Assertions.assertEquals((int)1, (int)report.errorMessages().size());
        Assertions.assertTrue((boolean)((String)report.errorMessages().get(0)).contains("timed out"));
        TierTopicDataLossValidatorTest.verifyMocks(mockAdminClient, allTtpGroups, 2);
    }

    @Test
    public void testDataLossDetectedAfterRetriesDuringFailures() throws Exception {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        long requestTimeoutMs = 30000L;
        LogManager mockLogManager = (LogManager)Mockito.mock(LogManager.class);
        Mockito.when((Object)mockLogManager.hadCleanShutdown()).thenReturn((Object)true);
        MockTime mockTime = new MockTime();
        long startTimeMs = mockTime.milliseconds();
        Metrics metrics = new Metrics();
        final UserTopicPartitionGroup utp0 = new UserTopicPartitionGroup(TEST_TOPIC, 0, 2, 5, TierPartitionStatus.ONLINE, false, false);
        TierTopicDataLossValidatorTest.setupMockLogs(mockLogManager, Arrays.asList(utp0.mockLog));
        TierTopic mockTierTopic = TierTopicDataLossValidatorTest.setupMockTierTopic();
        HashMap<Integer, TierTopicPartitionGroup> leaderToTtpGroup = new HashMap<Integer, TierTopicPartitionGroup>();
        HashSet<TopicIdPartition> utpsForTtp0 = new HashSet<TopicIdPartition>(Arrays.asList(utp0.utp));
        PartitionState leaderStateTtp0 = TierTopicDataLossValidatorTest.makePartitionState(utp0.lastMaterializedEpoch(), 6, TierTopicDataLossValidatorTest.makeBatch(0L, 0, 2), TierTopicDataLossValidatorTest.makeBatch(2L, 1, 2));
        final TierTopicPartitionGroup ttp0 = new TierTopicPartitionGroup(0, 1, utp0.lastMaterializedEpoch(), leaderStateTtp0, mockTierTopic, utpsForTtp0);
        leaderToTtpGroup.put(1, ttp0);
        PartitionResult noLeaderPartitionResult = new PartitionResult(-1, -1, new ArrayList());
        PartitionResult withLeaderPartitionResult = new PartitionResult(1, utp0.lastMaterializedEpoch(), new ArrayList());
        Mockito.when((Object)ttp0.kafkaFutureForReplicaStatusPartitionResult.get()).thenReturn((Object)noLeaderPartitionResult).thenThrow(new Throwable[]{new RuntimeException(String.format("Dummy exception for %s leader", ttp0.ttp))}).thenReturn((Object)withLeaderPartitionResult).thenReturn((Object)withLeaderPartitionResult);
        HashSet<TierTopicPartitionGroup> allTtpGroups = new HashSet<TierTopicPartitionGroup>(Collections.singletonList(ttp0));
        ConfluentAdmin mockAdminClient = (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class);
        TierTopicDataLossValidatorTest.setupReplicaStatusMock(mockAdminClient, allTtpGroups, 4);
        MockInMemoryTierObjectStore mockObjectStore = TierTopicDataLossValidatorTest.createMockObjectStore((Time)mockTime, metrics);
        TierTopicDataLossValidator validator = this.newInstance((TierObjectStore)mockObjectStore, mockTierTopic, mockLogManager, mockAdminClient, (nodeId, leaderId, context) -> ((TierTopicPartitionGroup)leaderToTtpGroup.get((Object)Integer.valueOf((int)leaderId))).mockLeaderEndpoint, Collections.emptyList(), (Time)mockTime, metrics, true, false);
        Producer producer = (Producer)Mockito.mock(Producer.class);
        TierTopicHeadDataLossDetectionRequest request = TierTopicDataLossValidatorTest.createRequest();
        TierTopicHeadDataLossDetectionResponse response = validator.detectDataLossInTierTopicHead(request, validationSource, producer, requestTimeoutMs, 0L);
        TierTopicDataLossValidatorTest.checkMetrics(validator, validationSource, 1, 0, 1, 1, 0, 0);
        Assertions.assertTrue((boolean)response.errorMessages().isEmpty());
        Assertions.assertEquals((Object)TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, (Object)response.completionStatus());
        TierTopicHeadDataLossReport report = TierTopicDataLossValidatorTest.deserializeReport(response, (TierObjectStore)mockObjectStore);
        Assertions.assertEquals((Object)TierTopicHeadDataLossReport.CompletionStatus.SUCCESS, (Object)report.completionStatus());
        Assertions.assertEquals((short)1, (short)report.version());
        Assertions.assertEquals((int)0, (int)report.brokerId());
        Assertions.assertEquals((Object)validationSource, (Object)report.source());
        Assertions.assertTrue((report.startTimestampMs() >= startTimeMs ? 1 : 0) != 0);
        Assertions.assertTrue((report.endTimestampMs() >= report.startTimestampMs() ? 1 : 0) != 0);
        HashMap<TopicIdPartition, AffectedUserTopicPartitionInfo> expectedAffectedUtp = new HashMap<TopicIdPartition, AffectedUserTopicPartitionInfo>(){
            {
                this.put(utp0.utp, TierTopicDataLossValidatorTest.makeAffectedUserTopicPartitionInfo(ttp0, utp0, false));
            }
        };
        Assertions.assertEquals((Object)expectedAffectedUtp, (Object)report.affectedUserTopicPartitions());
        HashMap<TopicPartition, AffectedTierTopicPartitionInfo> expectedAffectedTtp = new HashMap<TopicPartition, AffectedTierTopicPartitionInfo>(){
            {
                this.put(ttp0.ttp, TierTopicDataLossValidatorTest.makeAffectedTierTopicPartitionInfo(utp0, false, new OffsetAndEpoch(-1L, Optional.of(-1))));
            }
        };
        Assertions.assertEquals((Object)expectedAffectedTtp, (Object)report.affectedTierTopicPartitions());
        Assertions.assertFalse((boolean)report.hasFailures());
        TierTopicDataLossValidatorTest.verifyMocks(mockAdminClient, allTtpGroups, 4);
    }

    @Test
    public void testNoDataLossDetected() throws Exception {
        ValidationSource validationSource = ValidationSource.ON_DEMAND_VALIDATION;
        LogManager mockLogManager = (LogManager)Mockito.mock(LogManager.class);
        Mockito.when((Object)mockLogManager.hadCleanShutdown()).thenReturn((Object)true);
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics();
        UserTopicPartitionGroup utp0 = new UserTopicPartitionGroup(TEST_TOPIC, 0, 1, 3, TierPartitionStatus.ONLINE, false, false);
        TierTopicDataLossValidatorTest.setupMockLogs(mockLogManager, Collections.singletonList(utp0.mockLog));
        TierTopic mockTierTopic = TierTopicDataLossValidatorTest.setupMockTierTopic();
        HashMap<Integer, TierTopicPartitionGroup> leaderToTtpGroup = new HashMap<Integer, TierTopicPartitionGroup>();
        HashSet<TopicIdPartition> utpsForTtp0 = new HashSet<TopicIdPartition>(Arrays.asList(utp0.utp));
        PartitionState leaderStateTtp0 = TierTopicDataLossValidatorTest.makePartitionState(utp0.lastMaterializedEpoch(), 6, TierTopicDataLossValidatorTest.makeBatch(0L, 0, 2), TierTopicDataLossValidatorTest.makeBatch(2L, 1, 2), TierTopicDataLossValidatorTest.makeBatch(4L, 2, 5));
        TierTopicPartitionGroup ttp0 = new TierTopicPartitionGroup(0, 1, utp0.lastMaterializedEpoch(), leaderStateTtp0, mockTierTopic, utpsForTtp0);
        leaderToTtpGroup.put(1, ttp0);
        HashSet<TierTopicPartitionGroup> allTtpGroups = new HashSet<TierTopicPartitionGroup>(Arrays.asList(ttp0));
        ConfluentAdmin mockAdminClient = (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class);
        TierTopicDataLossValidatorTest.setupReplicaStatusMock(mockAdminClient, allTtpGroups, 2);
        TierObjectStore mockObjectStore = (TierObjectStore)Mockito.mock(TierObjectStore.class);
        TierTopicDataLossValidator validator = this.newInstance(mockObjectStore, mockTierTopic, mockLogManager, mockAdminClient, (nodeId, leaderId, context) -> ((TierTopicPartitionGroup)leaderToTtpGroup.get((Object)Integer.valueOf((int)leaderId))).mockLeaderEndpoint, Collections.emptyList(), (Time)mockTime, metrics, true, true);
        Producer producer = (Producer)Mockito.mock(KafkaProducer.class);
        TierTopicHeadDataLossDetectionRequest request = TierTopicDataLossValidatorTest.createRequest();
        TierTopicHeadDataLossDetectionResponse response = validator.detectDataLossInTierTopicHead(request, validationSource, producer, 30000L, 0L);
        TierTopicDataLossValidatorTest.checkZeroMetrics(validator, validationSource);
        Assertions.assertTrue((boolean)response.dataLossReportPath().isEmpty());
        Assertions.assertEquals((Object)TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, (Object)response.completionStatus());
        Assertions.assertTrue((boolean)response.errorMessages().isEmpty());
        TierTopicDataLossValidatorTest.verifyMocks(mockAdminClient, allTtpGroups, 2);
    }

    @Test
    public void testDataLossDetectedButObjectStoreUploadFailed() throws Exception {
        ValidationSource validationSource = ValidationSource.UNCLEAN_RESTART_VALIDATION;
        LogManager mockLogManager = (LogManager)Mockito.mock(LogManager.class);
        Mockito.when((Object)mockLogManager.hadCleanShutdown()).thenReturn((Object)false);
        MockTime mockTime = new MockTime();
        Metrics metrics = new Metrics();
        UserTopicPartitionGroup utp0 = new UserTopicPartitionGroup(TEST_TOPIC, 0, 2, 5, TierPartitionStatus.ONLINE, false, false);
        TierTopicDataLossValidatorTest.setupMockLogs(mockLogManager, Collections.singletonList(utp0.mockLog));
        TierTopic mockTierTopic = TierTopicDataLossValidatorTest.setupMockTierTopic();
        HashMap<Integer, TierTopicPartitionGroup> leaderToTtpGroup = new HashMap<Integer, TierTopicPartitionGroup>();
        HashSet<TopicIdPartition> utpsForTtp0 = new HashSet<TopicIdPartition>(Arrays.asList(utp0.utp));
        PartitionState leaderStateTtp0 = TierTopicDataLossValidatorTest.makePartitionState(utp0.lastMaterializedEpoch(), 6, TierTopicDataLossValidatorTest.makeBatch(0L, 0, 2), TierTopicDataLossValidatorTest.makeBatch(2L, 1, 2));
        TierTopicPartitionGroup ttp0 = new TierTopicPartitionGroup(0, 1, utp0.lastMaterializedEpoch(), leaderStateTtp0, mockTierTopic, utpsForTtp0);
        leaderToTtpGroup.put(1, ttp0);
        HashSet<TierTopicPartitionGroup> allTtpGroups = new HashSet<TierTopicPartitionGroup>(Arrays.asList(ttp0));
        ConfluentAdmin mockAdminClient = (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class);
        TierTopicDataLossValidatorTest.setupReplicaStatusMock(mockAdminClient, allTtpGroups, 2);
        TierObjectStore mockObjectStore = (TierObjectStore)Mockito.mock(TierObjectStore.class);
        Mockito.when((Object)mockObjectStore.putBuffer((ObjectStoreMetadata)ArgumentMatchers.any(), (ByteBuffer)ArgumentMatchers.any(), (ObjectType)ArgumentMatchers.any())).thenThrow(new Throwable[]{new TierObjectStoreFatalException("Dummy fatal object store exception")});
        TierTopicDataLossValidator validator = this.newInstance(mockObjectStore, mockTierTopic, mockLogManager, mockAdminClient, (nodeId, leaderId, context) -> ((TierTopicPartitionGroup)leaderToTtpGroup.get((Object)Integer.valueOf((int)leaderId))).mockLeaderEndpoint, Collections.emptyList(), (Time)mockTime, metrics, true, false);
        Producer producer = (Producer)Mockito.mock(KafkaProducer.class);
        TierTopicHeadDataLossDetectionRequest request = TierTopicDataLossValidatorTest.createRequest();
        Assertions.assertThrows(RuntimeException.class, () -> validator.detectDataLossInTierTopicHead(request, validationSource, producer, 30000L, 0L));
        TierTopicDataLossValidatorTest.checkMetrics(validator, validationSource, 1, 0, 1, 1, 0, 1);
        TierTopicDataLossValidatorTest.verifyMocks(mockAdminClient, allTtpGroups, 2);
    }

    private static class TierTopicPartitionGroup {
        final TopicPartition ttp;
        final int leader;
        final KafkaFuture<PartitionResult> kafkaFutureForReplicaStatusPartitionResult;
        final MockLeaderEndPoint mockLeaderEndpoint;

        public TierTopicPartitionGroup(int partition, int leaderReturnedByReplicaStatus, int latestLeaderEpochReturnedByReplicaStatus, PartitionState leaderState, TierTopic mockTierTopic, Set<TopicIdPartition> utps) throws Exception {
            this.ttp = new TopicPartition("_confluent-tier-state", partition);
            this.leader = leaderReturnedByReplicaStatus;
            for (TopicIdPartition utp : utps) {
                Mockito.when((Object)mockTierTopic.toTierTopicPartition(utp)).thenReturn((Object)this.ttp);
            }
            this.kafkaFutureForReplicaStatusPartitionResult = (KafkaFuture)Mockito.mock(KafkaFuture.class);
            PartitionResult partitionResult = new PartitionResult(this.leader, latestLeaderEpochReturnedByReplicaStatus, new ArrayList());
            Mockito.when((Object)this.kafkaFutureForReplicaStatusPartitionResult.get()).thenReturn((Object)partitionResult);
            this.mockLeaderEndpoint = new MockLeaderEndPoint(new BrokerEndPoint(this.leader, "localhost", new Random().nextInt()), false, ApiKeys.FETCH.latestVersion());
            this.mockLeaderEndpoint.setLeaderState(this.ttp, leaderState);
        }
    }

    private static class UserTopicPartitionGroup {
        final TopicIdPartition utp;
        final MergedLog mockLog;
        final OffsetAndEpoch ftpsLastMaterializedOffsetEpoch;
        final TierPartitionStatus ftpsStatus;
        final Partition mockPartition;

        public UserTopicPartitionGroup(String topic, int partition, int lastMaterializedEpoch, int lastMaterializedOffset, TierPartitionStatus ftpsStatus, boolean isDeleted, boolean isStray) {
            this.utp = new TopicIdPartition(topic, UUID.randomUUID(), partition);
            this.mockLog = (MergedLog)Mockito.mock(MergedLog.class);
            this.ftpsLastMaterializedOffsetEpoch = lastMaterializedEpoch >= 0 ? new OffsetAndEpoch((long)lastMaterializedOffset, Optional.of(lastMaterializedEpoch)) : new OffsetAndEpoch((long)lastMaterializedOffset, Optional.empty());
            this.ftpsStatus = ftpsStatus;
            FileTierPartitionState ftps = (FileTierPartitionState)Mockito.mock(FileTierPartitionState.class);
            Mockito.when((Object)ftps.status()).thenReturn((Object)ftpsStatus);
            Mockito.when((Object)ftps.topicIdPartition()).thenReturn(Optional.of(this.utp));
            Mockito.when((Object)ftps.lastLocalMaterializedSrcOffsetAndEpoch()).thenReturn((Object)this.ftpsLastMaterializedOffsetEpoch);
            Mockito.when((Object)this.mockLog.topicIdPartition()).thenReturn((Object)Option.apply((Object)this.utp));
            Mockito.when((Object)this.mockLog.isDeleted()).thenReturn((Object)isDeleted);
            Mockito.when((Object)this.mockLog.isStray()).thenReturn((Object)isStray);
            Mockito.when((Object)this.mockLog.tierPartitionState()).thenReturn((Object)ftps);
            this.mockPartition = (Partition)Mockito.mock(Partition.class);
            Mockito.when((Object)this.mockPartition.topicPartition()).thenReturn((Object)this.utp.topicPartition());
        }

        int lastMaterializedEpoch() {
            return (Integer)this.ftpsLastMaterializedOffsetEpoch.epoch().get();
        }
    }
}

