/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.topic;

import io.confluent.rest.TierTopicHeadDataLossDetectionRequest;
import io.confluent.rest.TierTopicHeadDataLossDetectionResponse;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import kafka.log.LogManager;
import kafka.server.Defaults;
import kafka.server.LeaderEndpointSupplier;
import kafka.server.ReplicaManager;
import kafka.tier.store.TierObjectStore;
import kafka.tier.topic.SoloTierTopicDataLossValidator;
import kafka.tier.topic.TierTopic;
import kafka.tier.topic.TierTopicManagerConfig;
import kafka.tier.topic.recovery.ValidationSource;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import scala.collection.JavaConverters;

public class SoloTierTopicDataLossValidatorTest {
    private static final TierTopicHeadDataLossDetectionRequest DUMMY_REQUEST = new TierTopicHeadDataLossDetectionRequest("identifier", new HashSet());

    private SoloTierTopicDataLossValidator newInstance() {
        return this.newInstance(Optional.empty(), Optional.empty());
    }

    private SoloTierTopicDataLossValidator newInstance(Optional<Semaphore> condition, Optional<Semaphore> signal) {
        TierTopicManagerConfig configMock = (TierTopicManagerConfig)Mockito.mock(TierTopicManagerConfig.class);
        LogManager mockLogManager = (LogManager)Mockito.mock(LogManager.class);
        Mockito.when((Object)mockLogManager.allLogs()).thenReturn((Object)JavaConverters.collectionAsScalaIterable(new ArrayList()));
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)mockReplicaManager.leaderPartitionsIterator()).thenReturn((Object)JavaConverters.collectionAsScalaIterable(new ArrayList()).iterator());
        Mockito.when((Object)mockReplicaManager.logManager()).thenReturn((Object)mockLogManager);
        Mockito.when((Object)configMock.enableTierTopicDataLossDetection()).thenAnswer(val -> {
            signal.ifPresent(Semaphore::release);
            condition.ifPresent(c -> {
                try {
                    c.acquire();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            return true;
        }).thenReturn((Object)true);
        Mockito.when((Object)configMock.tierTopicDataLossDetectionMaxTimeoutMs()).thenReturn((Object)Defaults.TierTopicDataLossDetectionMaxTimeoutMs());
        return new SoloTierTopicDataLossValidator(configMock, (TierTopic)Mockito.mock(TierTopic.class), (TierObjectStore)Mockito.mock(TierObjectStore.class), mockReplicaManager, () -> (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class), (LeaderEndpointSupplier)Mockito.mock(LeaderEndpointSupplier.class), (Time)new MockTime(), (Metrics)Mockito.mock(Metrics.class));
    }

    private TierTopicHeadDataLossDetectionResponse detectDataLoss(SoloTierTopicDataLossValidator validator, Producer<byte[], byte[]> mockProducer) {
        try {
            return validator.detectDataLossInTierTopicHead(DUMMY_REQUEST, ValidationSource.ON_DEMAND_VALIDATION, mockProducer, 30000L);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testSoloInvocation() {
        SoloTierTopicDataLossValidator validator = this.newInstance();
        Producer mockProducer = (Producer)Mockito.mock(Producer.class);
        this.detectDataLoss(validator, (Producer<byte[], byte[]>)mockProducer);
    }

    @Test
    public void testConcurrentInvocation() throws Exception {
        Semaphore condition = new Semaphore(1);
        Semaphore signal = new Semaphore(1);
        SoloTierTopicDataLossValidator validator = this.newInstance(Optional.of(condition), Optional.of(signal));
        Producer mockProducer = (Producer)Mockito.mock(Producer.class);
        condition.acquire();
        signal.acquire();
        AtomicReference response1 = new AtomicReference();
        Thread t1 = new Thread(() -> {
            try {
                response1.set(this.detectDataLoss(validator, (Producer<byte[], byte[]>)mockProducer));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        t1.start();
        signal.acquire();
        Exception e = (Exception)Assertions.assertThrows(RuntimeException.class, () -> this.detectDataLoss(validator, (Producer<byte[], byte[]>)mockProducer));
        Assertions.assertTrue((boolean)(e.getCause() instanceof UnsupportedOperationException));
        condition.release();
        t1.join();
        Assertions.assertTrue((response1.get() != null ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)((TierTopicHeadDataLossDetectionResponse)response1.get()).dataLossReportPath().isEmpty());
        Assertions.assertEquals((Object)TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, (Object)((TierTopicHeadDataLossDetectionResponse)response1.get()).completionStatus());
        Assertions.assertTrue((boolean)((TierTopicHeadDataLossDetectionResponse)response1.get()).errorMessages().isEmpty());
        TierTopicHeadDataLossDetectionResponse response2 = this.detectDataLoss(validator, (Producer<byte[], byte[]>)mockProducer);
        Assertions.assertTrue((response2 != null ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)response2.dataLossReportPath().isEmpty());
        Assertions.assertEquals((Object)TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, (Object)response2.completionStatus());
        Assertions.assertTrue((boolean)response2.errorMessages().isEmpty());
    }
}

