/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import kafka.controller.KafkaController;
import kafka.log.LogConfig;
import kafka.metrics.KafkaMetricsGroup;
import kafka.server.ConfigType$;
import kafka.server.ReplicaManager;
import kafka.server.link.AbstractClusterLinkMetadataManagerTest;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetadataManagerWithZkSupport;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.Logging;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001=4A!\u0005\n\u00013!)a\u0004\u0001C\u0001?!9\u0011\u0005\u0001b\u0001\n\u0013\u0011\u0003B\u0002\u0015\u0001A\u0003%1\u0005C\u0004*\u0001\t\u0007I\u0011\u0002\u0016\t\rE\u0002\u0001\u0015!\u0003,\u0011\u001d\u0011\u0004A1A\u0005\nMBaA\u000f\u0001!\u0002\u0013!\u0004bB\u001e\u0001\u0005\u0004%I\u0001\u0010\u0005\u0007\u0003\u0002\u0001\u000b\u0011B\u001f\t\u000b\t\u0003A\u0011A\"\t\u000bq\u0003A\u0011A/\t\u000b\t\u0004A\u0011A/\t\u000b\u001d\u0004A\u0011A/\t\u000b%\u0004A\u0011A/\t\u000b-\u0004A\u0011A/\t\u000b5\u0004A\u0011A/\u0003W\rcWo\u001d;fe2Kgn['fi\u0006$\u0017\r^1NC:\fw-\u001a:XSRD'l[*vaB|'\u000f\u001e+fgRT!a\u0005\u000b\u0002\t1Lgn\u001b\u0006\u0003+Y\taa]3sm\u0016\u0014(\"A\f\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001A\u0007\t\u00037qi\u0011AE\u0005\u0003;I\u0011a%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].lU\r^1eCR\fW*\u00198bO\u0016\u0014H+Z:u\u0003\u0019a\u0014N\\5u}Q\t\u0001\u0005\u0005\u0002\u001c\u0001\u0005Q1m\u001c8ue>dG.\u001a:\u0016\u0003\r\u0002\"\u0001\n\u0014\u000e\u0003\u0015R!!\t\f\n\u0005\u001d*#aD&bM.\f7i\u001c8ue>dG.\u001a:\u0002\u0017\r|g\u000e\u001e:pY2,'\u000fI\u0001\u000e[\u0016$\u0018\rZ1uC\u000e\u000b7\r[3\u0016\u0003-\u0002\"\u0001L\u0018\u000e\u00035R!A\f\u000b\u0002\u00115,G/\u00193bi\u0006L!\u0001M\u0017\u0003\u001fi[W*\u001a;bI\u0006$\u0018mQ1dQ\u0016\fa\"\\3uC\u0012\fG/Y\"bG\",\u0007%\u0001\u0005{W\u000ec\u0017.\u001a8u+\u0005!\u0004CA\u001b9\u001b\u00051$BA\u001c\u0017\u0003\tQ8.\u0003\u0002:m\ti1*\u00194lCj[7\t\\5f]R\f\u0011B_6DY&,g\u000e\u001e\u0011\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4feV\tQ\b\u0005\u0002?\u007f5\tA#\u0003\u0002A)\tq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\u0018a\u0004:fa2L7-Y'b]\u0006<WM\u001d\u0011\u0002\u000bM,G/\u00169\u0015\u0005\u0011S\u0005CA#I\u001b\u00051%\"A$\u0002\u000bM\u001c\u0017\r\\1\n\u0005%3%\u0001B+oSRDQa\u0013\u0006A\u00021\u000bA!\u001b8g_B\u0011QJV\u0007\u0002\u001d*\u0011q\nU\u0001\u0004CBL'BA)S\u0003\u001dQW\u000f]5uKJT!a\u0015+\u0002\u000b),h.\u001b;\u000b\u0003U\u000b1a\u001c:h\u0013\t9fJ\u0001\u0005UKN$\u0018J\u001c4pQ\tQ\u0011\f\u0005\u0002N5&\u00111L\u0014\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017\u0001\u0003;fCJ$un\u001e8\u0015\u0003\u0011C#aC0\u0011\u00055\u0003\u0017BA1O\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\u0016uKN$X*\u001a;bI\u0006$\u0018\rV8qS\u000e\u001c%/Z1uS>tw+\u001b;i\r\u0006LG.\u001a3BiR,W\u000e\u001d;)\u00051!\u0007CA'f\u0013\t1gJ\u0001\u0003UKN$\u0018!\r;fgRlU\r^1eCR\fGk\u001c9jG\u000e\u0013X-\u0019;j_:<\u0016\u000e\u001e5U_BL7-\u0012=jgR\u001cX\t_2faRLwN\u001c\u0015\u0003\u001b\u0011\f1\u0005^3tiB\u000b'\u000f^5uS>tW\t\\3di&|g.\u00118e%\u0016\u001c\u0018n\u001a8bi&|g\u000e\u000b\u0002\u000fI\u0006)C/Z:u\u000f\u0016$Hk\u001c9jG\u000e{gNZ5h\r\u0006dGn\u001d\"bG.$vNW6DY&,g\u000e\u001e\u0015\u0003\u001f\u0011\fA\u0005^3ti\u001e+G\u000fV8qS\u000e\u001cuN\u001c4jOV\u001bXm\u001d*fa2L7-Y'b]\u0006<WM\u001d\u0015\u0003!\u0011\u0004")
public class ClusterLinkMetadataManagerWithZkSupportTest
extends AbstractClusterLinkMetadataManagerTest {
    private final KafkaController controller = (KafkaController)Mockito.mock(KafkaController.class);
    private final ZkMetadataCache metadataCache = (ZkMetadataCache)Mockito.mock(ZkMetadataCache.class);
    private final KafkaZkClient zkClient = (KafkaZkClient)Mockito.mock(KafkaZkClient.class);
    private final ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);

    private KafkaController controller() {
        return this.controller;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private KafkaZkClient zkClient() {
        return this.zkClient;
    }

    private ReplicaManager replicaManager() {
        return this.replicaManager;
    }

    @BeforeEach
    public void setUp(TestInfo info) {
        Mockito.reset((Object[])new Object[]{this.destAdmin(), this.metadataCache(), this.zkClient(), this.replicaManager()});
        Mockito.when((Object)this.zkClient().getChildren(ArgumentMatchers.anyString())).thenReturn((Object)Nil$.MODULE$);
        Mockito.when((Object)this.zkClient().getClusterLinks((Set)ArgumentMatchers.any())).thenReturn((Object)Predef$.MODULE$.Map().empty());
        if (info.getDisplayName().startsWith("testMetadataTopicCreation")) {
            this.metadataManager_$eq((ClusterLinkMetadataManager)new ClusterLinkMetadataManagerWithZkSupport(this.brokerConfig(), this.scheduler(), this.metadataCache(), this.controller(), this.zkClient(), (Function0 & Serializable & scala.Serializable)() -> this.destAdmin(), this.replicaManager()));
            return;
        }
        Mockito.when((Object)this.metadataCache().numPartitions("_confluent-link-metadata")).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)50)));
        this.metadataManager_$eq((ClusterLinkMetadataManager)new ClusterLinkMetadataManagerWithZkSupport(this.brokerConfig(), this.scheduler(), this.metadataCache(), this.controller(), this.zkClient(), (Function0 & Serializable & scala.Serializable)() -> this.destAdmin(), this.replicaManager()));
        this.metadataManager().startup();
        this.waitAndCreateMetadataTopic();
    }

    @AfterEach
    public void tearDown() {
        if (this.metadataManager() != null) {
            this.metadataManager().shutdown();
        }
        this.metadataManager_$eq(null);
    }

    @Test
    public void testMetadataTopicCreationWithFailedAttempt() {
        Mockito.reset((Object[])new Logging[]{this.metadataCache(), this.zkClient()});
        Mockito.when((Object)this.metadataCache().numPartitions("_confluent-link-metadata")).thenReturn((Object)None$.MODULE$);
        Mockito.reset((Object[])new Admin[]{this.destAdmin()});
        Mockito.when((Object)this.destAdmin().createTopics((Collection)ArgumentMatchers.any())).thenReturn((Object)this.createMetadataTopicResult((Option<Throwable>)new Some((Object)new TopicAuthorizationException("")))).thenReturn((Object)this.createMetadataTopicResult((Option<Throwable>)None$.MODULE$));
        this.metadataManager().startup();
        this.waitAndCreateMetadataTopic();
        ((Admin)Mockito.verify((Object)this.destAdmin(), (VerificationMode)Mockito.times((int)2))).createTopics((Collection)ArgumentMatchers.any());
        ((ZkMetadataCache)Mockito.verify((Object)this.metadataCache(), (VerificationMode)Mockito.times((int)2))).numPartitions("_confluent-link-metadata");
    }

    @Test
    public void testMetadataTopicCreationWithTopicExistsException() {
        Mockito.reset((Object[])new Logging[]{this.metadataCache(), this.zkClient()});
        Mockito.when((Object)this.metadataCache().numPartitions("_confluent-link-metadata")).thenReturn((Object)None$.MODULE$, (Object[])new Option[]{new Some((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(this.brokerConfig().clusterLinkMetadataTopicPartitions())))});
        Mockito.reset((Object[])new Admin[]{this.destAdmin()});
        Mockito.when((Object)this.destAdmin().createTopics((Collection)ArgumentMatchers.any())).thenReturn((Object)this.createMetadataTopicResult((Option<Throwable>)new Some((Object)new TopicExistsException(""))));
        this.metadataManager().startup();
        this.waitAndCreateMetadataTopic();
        ((Admin)Mockito.verify((Object)this.destAdmin())).createTopics((Collection)ArgumentMatchers.any());
        ((ZkMetadataCache)Mockito.verify((Object)this.metadataCache(), (VerificationMode)Mockito.times((int)2))).numPartitions("_confluent-link-metadata");
    }

    @Test
    public void testPartitionElectionAndResignation() {
        String clusterLinkName = "testLink";
        int partition = Utils.toPositive((int)Utils.murmur2((byte[])clusterLinkName.getBytes())) % Predef$.MODULE$.Integer2int(this.brokerConfig().clusterLinkMetadataTopicPartitions());
        this.metadataManager().onElection(partition, 10);
        ClusterLinkMetadataManager qual$1 = this.metadataManager();
        boolean x$2 = qual$1.isLinkCoordinator$default$2();
        Assertions.assertTrue((boolean)qual$1.isLinkCoordinator(clusterLinkName, x$2), (String)"Broker is not leader for cluster link");
        ClusterLinkMetadataManager qual$2 = this.metadataManager();
        String x$3 = "testLink2";
        boolean x$4 = qual$2.isLinkCoordinator$default$2();
        Assertions.assertFalse((boolean)qual$2.isLinkCoordinator(x$3, x$4), (String)"Broker is leader for cluster link");
        this.metadataManager().onResignation(partition, (Option)new Some((Object)BoxesRunTime.boxToInteger((int)9)));
        ClusterLinkMetadataManager qual$3 = this.metadataManager();
        boolean x$6 = qual$3.isLinkCoordinator$default$2();
        Assertions.assertTrue((boolean)qual$3.isLinkCoordinator(clusterLinkName, x$6), (String)"Broker is not leader for cluster link");
        this.metadataManager().onResignation(partition, (Option)new Some((Object)BoxesRunTime.boxToInteger((int)11)));
        ClusterLinkMetadataManager qual$4 = this.metadataManager();
        boolean x$8 = qual$4.isLinkCoordinator$default$2();
        Assertions.assertFalse((boolean)qual$4.isLinkCoordinator(clusterLinkName, x$8), (String)"Broker is leader for cluster link");
        this.metadataManager().onElection(partition, 9);
        ClusterLinkMetadataManager qual$5 = this.metadataManager();
        boolean x$10 = qual$5.isLinkCoordinator$default$2();
        Assertions.assertFalse((boolean)qual$5.isLinkCoordinator(clusterLinkName, x$10), (String)"Broker is leader for cluster link");
        this.metadataManager().onElection(partition, 12);
        ClusterLinkMetadataManager qual$6 = this.metadataManager();
        boolean x$12 = qual$6.isLinkCoordinator$default$2();
        Assertions.assertTrue((boolean)qual$6.isLinkCoordinator(clusterLinkName, x$12), (String)"Broker is not leader for cluster link");
        this.metadataManager().onResignation(partition, (Option)None$.MODULE$);
        ClusterLinkMetadataManager qual$7 = this.metadataManager();
        boolean x$14 = qual$7.isLinkCoordinator$default$2();
        Assertions.assertFalse((boolean)qual$7.isLinkCoordinator(clusterLinkName, x$14), (String)"Broker is leader for cluster link");
    }

    @Test
    public void testGetTopicConfigFallsBackToZkClient() {
        Mockito.reset((Object[])new KafkaMetricsGroup[]{this.zkClient(), this.replicaManager()});
        String topic = "topic";
        Properties expected = new Properties();
        String messageTimestampTypeKey = "message.timestamp.type";
        String messageTimestampTypeValue = "CreateTime";
        expected.put(messageTimestampTypeKey, messageTimestampTypeValue);
        Mockito.when((Object)this.zkClient().getEntityConfigs(ConfigType$.MODULE$.Topic(), topic)).thenReturn((Object)expected);
        Mockito.when((Object)this.replicaManager().getLogConfig(new TopicPartition(topic, 0))).thenReturn((Object)None$.MODULE$);
        Properties actual = this.metadataManager().getTopicConfig(topic);
        Assertions.assertEquals((int)1, (int)actual.size());
        Assertions.assertEquals((Object)messageTimestampTypeValue, (Object)actual.getProperty(messageTimestampTypeKey));
    }

    @Test
    public void testGetTopicConfigUsesReplicaManager() {
        Mockito.reset((Object[])new KafkaMetricsGroup[]{this.zkClient(), this.replicaManager()});
        String topic = "topic";
        Properties expected = new Properties();
        String messageTimestampTypeKey = "message.timestamp.type";
        String messageTimestampTypeValue = "CreateTime";
        expected.put(messageTimestampTypeKey, messageTimestampTypeValue);
        Properties allProperties = new Properties();
        allProperties.putAll((Map<?, ?>)expected);
        allProperties.put("min.insync.replicas", "1");
        LogConfig logConfig = new LogConfig((Map)allProperties, (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{messageTimestampTypeKey})));
        Mockito.when((Object)this.replicaManager().getLogConfig(new TopicPartition(topic, 0))).thenReturn((Object)new Some((Object)logConfig));
        Properties actual = this.metadataManager().getTopicConfig(topic);
        Assertions.assertEquals((int)1, (int)actual.size());
        Assertions.assertEquals((Object)messageTimestampTypeValue, (Object)actual.getProperty(messageTimestampTypeKey));
    }
}

