/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.metadata;

import java.io.File;
import java.io.Serializable;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import kafka.coordinator.quota.QuotaCoordinator;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.log.LogManager;
import kafka.security.CredentialProvider;
import kafka.server.BrokerServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.ReplicaManager;
import kafka.server.link.ClusterLinkDisabled;
import kafka.server.metadata.AclPublisher;
import kafka.server.metadata.BrokerMetadataPublisher;
import kafka.server.metadata.BrokerMetadataPublisher$;
import kafka.server.metadata.DelegationTokenPublisher;
import kafka.server.metadata.DynamicClientQuotaPublisher;
import kafka.server.metadata.DynamicConfigPublisher;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.server.metadata.ScramPublisher;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.tier.TierDeletedPartitionsCoordinator;
import kafka.tier.backupObjectLifecycle.BackupObjectLifecycleManagerCoordinator;
import kafka.tier.snapshot.TierTopicSnapshotCoordinator;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataImageTest;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.RichLong$;

@ScalaSignature(bytes="\u0006\u0005}4A!\u0004\b\u0001+!)A\u0004\u0001C\u0001;!9\u0001\u0005\u0001b\u0001\n\u0003\t\u0003B\u0002\u001e\u0001A\u0003%!\u0005C\u0003<\u0001\u0011\u0005A\bC\u0003N\u0001\u0011\u0005A\bC\u0003S\u0001\u0011\u0005A\bC\u0003X\u0001\u0011%\u0001\fC\u0004o\u0001\t\u0007I\u0011A8\t\ra\u0004\u0001\u0015!\u0003q\u0011\u0015I\b\u0001\"\u0001=\u0011\u0015Y\b\u0001\"\u0001=\u0011\u0015i\b\u0001\"\u0001=\u0005m\u0011%o\\6fe6+G/\u00193bi\u0006\u0004VO\u00197jg\",'\u000fV3ti*\u0011q\u0002E\u0001\t[\u0016$\u0018\rZ1uC*\u0011\u0011CE\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003M\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001-A\u0011qCG\u0007\u00021)\t\u0011$A\u0003tG\u0006d\u0017-\u0003\u0002\u001c1\t1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#\u0001\u0010\u0011\u0005}\u0001Q\"\u0001\b\u0002\u001b\u0015D\u0018\u000e^#yG\u0016\u0004H/[8o+\u0005\u0011\u0003cA\u0012-]5\tAE\u0003\u0002&M\u00051\u0011\r^8nS\u000eT!a\n\u0015\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002*U\u0005!Q\u000f^5m\u0015\u0005Y\u0013\u0001\u00026bm\u0006L!!\f\u0013\u0003\u001f\u0005#x.\\5d%\u00164WM]3oG\u0016\u0004\"aL\u001c\u000f\u0005A*dBA\u00195\u001b\u0005\u0011$BA\u001a\u0015\u0003\u0019a$o\\8u}%\t\u0011$\u0003\u000271\u00059\u0001/Y2lC\u001e,\u0017B\u0001\u001d:\u0005%!\u0006N]8xC\ndWM\u0003\u000271\u0005qQ\r_5u\u000bb\u001cW\r\u001d;j_:\u0004\u0013!B:fiV\u0003H#A\u001f\u0011\u0005]q\u0014BA \u0019\u0005\u0011)f.\u001b;)\u0005\u0011\t\u0005C\u0001\"L\u001b\u0005\u0019%B\u0001#F\u0003\r\t\u0007/\u001b\u0006\u0003\r\u001e\u000bqA[;qSR,'O\u0003\u0002I\u0013\u0006)!.\u001e8ji*\t!*A\u0002pe\u001eL!\u0001T\"\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.\u0001\u0005uK\u0006\u0014Hi\\<oQ\t)q\n\u0005\u0002C!&\u0011\u0011k\u0011\u0002\n\u0003\u001a$XM]#bG\"\f\u0011\u0003^3ti\u001e+G\u000fV8qS\u000e$U\r\u001c;bQ\t1A\u000b\u0005\u0002C+&\u0011ak\u0011\u0002\u0005)\u0016\u001cH/A\u000foK^lunY6Es:\fW.[2D_:4\u0017n\u001a)vE2L7\u000f[3s)\rIFL\u0019\t\u0003?iK!a\u0017\b\u0003-\u0011Kh.Y7jG\u000e{gNZ5h!V\u0014G.[:iKJDQ!X\u0004A\u0002y\u000baA\u0019:pW\u0016\u0014\bCA0a\u001b\u0005\u0001\u0012BA1\u0011\u00051\u0011%o\\6feN+'O^3s\u0011\u0015\u0019w\u00011\u0001e\u00031)'O]8s\u0011\u0006tG\r\\3s!\t)G.D\u0001g\u0015\t9\u0007.A\u0003gCVdGO\u0003\u0002\u0012S*\u00111C\u001b\u0006\u0003W&\u000ba!\u00199bG\",\u0017BA7g\u000511\u0015-\u001e7u\u0011\u0006tG\r\\3s\u0003%\u0019G.^:uKJLE-F\u0001q!\t\tXO\u0004\u0002sgB\u0011\u0011\u0007G\u0005\u0003ib\ta\u0001\u0015:fI\u00164\u0017B\u0001<x\u0005\u0019\u0019FO]5oO*\u0011A\u000fG\u0001\u000bG2,8\u000f^3s\u0013\u0012\u0004\u0013!\u000b;fgR\u0014V\r\\8bIV\u0003H-\u0019;fI\u001aKG.Z:XSRDw.\u001e;D_:4\u0017nZ\"iC:<W\r\u000b\u0002\u000b)\u0006\u0001C/Z:u\u000bb\u001cW\r\u001d;j_:Le.\u00169eCR,7i\\8sI&t\u0017\r^8sQ\tYA+\u0001\u0013uKN$h*Z<J[\u0006<W\rU;tQ\u0016$Gk\\$s_V\u00048i\\8sI&t\u0017\r^8sQ\taA\u000b")
public class BrokerMetadataPublisherTest {
    private final AtomicReference<Throwable> exitException = new AtomicReference<Object>(null);
    private final String clusterId;

    public AtomicReference<Throwable> exitException() {
        return this.exitException;
    }

    @BeforeEach
    public void setUp() {
        Exit.setExitProcedure((code, x$1) -> this.exitException().set(new RuntimeException(new StringBuilder(5).append("Exit ").append(code).toString())));
        Exit.setHaltProcedure((code, x$2) -> this.exitException().set(new RuntimeException(new StringBuilder(5).append("Halt ").append(code).toString())));
    }

    @AfterEach
    public void tearDown() {
        Exit.resetExitProcedure();
        Exit.resetHaltProcedure();
        Throwable exception = this.exitException().get();
        if (exception != null) {
            throw exception;
        }
    }

    @Test
    public void testGetTopicDelta() {
        Predef$.MODULE$.assert(BrokerMetadataPublisher$.MODULE$.getTopicDelta("not-a-topic", MetadataImageTest.IMAGE1, MetadataImageTest.DELTA1).isEmpty(), (Function0 & Serializable)() -> "Expected no delta for unknown topic");
        Predef$.MODULE$.assert(BrokerMetadataPublisher$.MODULE$.getTopicDelta("foo", MetadataImageTest.IMAGE1, MetadataImageTest.DELTA1).isEmpty(), (Function0 & Serializable)() -> "Expected no delta for deleted topic");
        Predef$.MODULE$.assert(BrokerMetadataPublisher$.MODULE$.getTopicDelta("bar", MetadataImageTest.IMAGE1, MetadataImageTest.DELTA1).isDefined(), (Function0 & Serializable)() -> "Expected to see delta for changed topic");
    }

    private DynamicConfigPublisher newMockDynamicConfigPublisher(BrokerServer broker, FaultHandler errorHandler) {
        return (DynamicConfigPublisher)Mockito.spy((Object)new DynamicConfigPublisher(broker.config(), errorHandler, broker.dynamicConfigHandlers().toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()), "broker"));
    }

    public String clusterId() {
        return this.clusterId;
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testReloadUpdatedFilesWithoutConfigChange() {
        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setNumBrokerNodes(1).setNumControllerNodes(1).build()).build();){
            cluster.format();
            cluster.startup();
            cluster.waitForReadyBrokers();
            BrokerServer broker = cluster.brokers().values().iterator().next();
            DynamicConfigPublisher publisher = this.newMockDynamicConfigPublisher(broker, (FaultHandler)cluster.nonFatalFaultHandler());
            AtomicInteger numTimesReloadCalled = new AtomicInteger(0);
            publisher.reloadUpdatedFilesWithoutConfigChange((Properties)ArgumentMatchers.any());
            Mockito.when((Object)BoxedUnit.UNIT).thenAnswer((Answer)new Answer<BoxedUnit>(null, numTimesReloadCalled){
                private final AtomicInteger numTimesReloadCalled$1;

                public void answer(InvocationOnMock invocation) {
                    this.numTimesReloadCalled$1.addAndGet(1);
                }
                {
                    this.numTimesReloadCalled$1 = numTimesReloadCalled$1;
                }
            });
            broker.brokerMetadataPublisher().dynamicConfigPublisher_$eq(publisher);
            try (Admin admin = Admin.create((Properties)cluster.clientProperties());){
                Assertions.assertEquals((int)0, (int)numTimesReloadCalled.get());
                admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.MaxConnectionsProp(), "123"), AlterConfigOp.OpType.SET)))).all().get();
                long l = 100L;
                long waitUntilTrue_waitTimeMs = 15000L;
                long waitUntilTrue_startTime = System.currentTimeMillis();
                while (!BrokerMetadataPublisherTest.$anonfun$testReloadUpdatedFilesWithoutConfigChange$1(numTimesReloadCalled)) {
                    void waitUntilTrue_pause;
                    if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                        Assertions.fail((String)"numTimesConfigured never reached desired value");
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
                }
                admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(broker.config().nodeId())), Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.MaxConnectionsProp(), "123"), AlterConfigOp.OpType.SET)))).all().get();
                long l2 = 100L;
                long waitUntilTrue_waitTimeMs2 = 15000L;
                long waitUntilTrue_startTime2 = System.currentTimeMillis();
                while (!BrokerMetadataPublisherTest.$anonfun$testReloadUpdatedFilesWithoutConfigChange$3(numTimesReloadCalled)) {
                    void waitUntilTrue_pause;
                    if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs2) {
                        Assertions.fail((String)"numTimesConfigured never reached desired value");
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs2), (long)waitUntilTrue_pause));
                }
            }
        }
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testExceptionInUpdateCoordinator() {
        KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setNumBrokerNodes(1).setNumControllerNodes(1).setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV1).build()).build();
        try {
            cluster.format();
            cluster.startup();
            cluster.waitForReadyBrokers();
            BrokerServer broker = cluster.brokers().values().iterator().next();
            long l = 60000L;
            TestUtils$ retry_this = TestUtils$.MODULE$;
            long l2 = 1L;
            long retry_startTime = System.currentTimeMillis();
            while (true) {
                try {
                    BrokerMetadataPublisherTest.$anonfun$testExceptionInUpdateCoordinator$1(broker);
                }
                catch (AssertionError retry_e) {
                    void retry_maxWaitMs;
                    if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                        throw retry_e;
                    }
                    if (retry_this.logger().underlying().isInfoEnabled()) {
                        String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l2).append(", and then retrying.").toString();
                        Object var19_7 = null;
                        retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                    }
                    Thread.sleep(l2);
                    l2 += package$.MODULE$.min(l2, 1000L);
                    continue;
                }
                break;
            }
            Object var7_4 = null;
            Object var12_9 = null;
            BrokerMetadataPublisher publisher = (BrokerMetadataPublisher)Mockito.spy((Object)broker.brokerMetadataPublisher());
            ((BrokerMetadataPublisher)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("injected failure")}).when((Object)publisher)).updateCoordinator((MetadataImage)ArgumentMatchers.any(), (MetadataDelta)ArgumentMatchers.any(), (String)ArgumentMatchers.any(), (Function2)ArgumentMatchers.any(), (Function2)ArgumentMatchers.any());
            broker.sharedServer().loader().removeAndClosePublisher((MetadataPublisher)broker.brokerMetadataPublisher()).get(1L, TimeUnit.MINUTES);
            broker.metadataPublishers().remove(broker.brokerMetadataPublisher());
            broker.sharedServer().loader().installPublishers(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)publisher, (List)Nil$.MODULE$)).asJava()).get(1L, TimeUnit.MINUTES);
            try (Admin admin = Admin.create((Properties)cluster.clientProperties());){
                admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short)1))).all().get();
            }
            long l3 = 60000L;
            TestUtils$ retry_this2 = TestUtils$.MODULE$;
            long l4 = 1L;
            long retry_startTime2 = System.currentTimeMillis();
            while (true) {
                try {
                    BrokerMetadataPublisherTest.$anonfun$testExceptionInUpdateCoordinator$2(cluster);
                }
                catch (AssertionError retry_e) {
                    void retry_maxWaitMs;
                    if (System.currentTimeMillis() - retry_startTime2 > retry_maxWaitMs) {
                        throw retry_e;
                    }
                    if (retry_this2.logger().underlying().isInfoEnabled()) {
                        String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l4).append(", and then retrying.").toString();
                        Object var20_19 = null;
                        retry_this2.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this2, (String)msgWithLogIdent_msg));
                    }
                    Thread.sleep(l4);
                    l4 += package$.MODULE$.min(l4, 1000L);
                    continue;
                }
                break;
            }
            Object var13_14 = null;
            Object var18_18 = null;
        }
        finally {
            cluster.nonFatalFaultHandler().setIgnore(true);
            cluster.close();
        }
    }

    @Test
    public void testNewImagePushedToGroupCoordinator() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(0, "", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        KRaftMetadataCache metadataCache = new KRaftMetadataCache(0);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        GroupCoordinator groupCoordinator = (GroupCoordinator)Mockito.mock(GroupCoordinator.class);
        FaultHandler faultHandler = (FaultHandler)Mockito.mock(FaultHandler.class);
        BrokerMetadataPublisher metadataPublisher = new BrokerMetadataPublisher(this.clusterId(), config, metadataCache, logManager, replicaManager, groupCoordinator, (TransactionCoordinator)Mockito.mock(TransactionCoordinator.class), (DynamicConfigPublisher)Mockito.mock(DynamicConfigPublisher.class), (DynamicClientQuotaPublisher)Mockito.mock(DynamicClientQuotaPublisher.class), (ScramPublisher)Mockito.mock(ScramPublisher.class), (DelegationTokenPublisher)Mockito.mock(DelegationTokenPublisher.class), (AclPublisher)Mockito.mock(AclPublisher.class), (CredentialProvider)Mockito.mock(CredentialProvider.class), faultHandler, faultHandler, (Option)new Some((Object)new ClusterLinkDisabled.LinkManager()), (Option)new Some(Mockito.mock(TierDeletedPartitionsCoordinator.class)), (Option)new Some(Mockito.mock(BackupObjectLifecycleManagerCoordinator.class)), (Option)new Some(Mockito.mock(TierTopicSnapshotCoordinator.class)), (Option)new Some(Mockito.mock(QuotaCoordinator.class)));
        MetadataImage image = MetadataImage.EMPTY;
        MetadataDelta delta = new MetadataDelta.Builder().setImage(image).build();
        metadataPublisher.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest.Builder().provenance(MetadataProvenance.EMPTY).leaderAndEpoch(LeaderAndEpoch.UNKNOWN).numBatches(1).elapsedNs(100L).numBytes(42L).build());
        ((GroupCoordinator)Mockito.verify((Object)groupCoordinator)).onNewMetadataImage(image, delta);
    }

    public static final /* synthetic */ boolean $anonfun$testReloadUpdatedFilesWithoutConfigChange$1(AtomicInteger numTimesReloadCalled$1) {
        return numTimesReloadCalled$1.get() == 0;
    }

    public static final /* synthetic */ String $anonfun$testReloadUpdatedFilesWithoutConfigChange$2() {
        return "numTimesConfigured never reached desired value";
    }

    public static final /* synthetic */ boolean $anonfun$testReloadUpdatedFilesWithoutConfigChange$3(AtomicInteger numTimesReloadCalled$1) {
        return numTimesReloadCalled$1.get() == 1;
    }

    public static final /* synthetic */ String $anonfun$testReloadUpdatedFilesWithoutConfigChange$4() {
        return "numTimesConfigured never reached desired value";
    }

    public static final /* synthetic */ void $anonfun$testExceptionInUpdateCoordinator$1(BrokerServer broker$1) {
        Assertions.assertNotNull((Object)broker$1.brokerMetadataPublisher());
    }

    public static final /* synthetic */ void $anonfun$testExceptionInUpdateCoordinator$2(KafkaClusterTestKit cluster$1) {
        Assertions.assertTrue((boolean)((String)Option$.MODULE$.apply((Object)cluster$1.nonFatalFaultHandler().firstException()).flatMap((Function1 & Serializable)e -> Option$.MODULE$.apply((Object)e.getMessage())).getOrElse((Function0 & Serializable)() -> "(none)")).contains("injected failure"));
    }

    public BrokerMetadataPublisherTest() {
        this.clusterId = "Test";
    }
}

