/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import kafka.controller.KafkaController;
import kafka.server.ReplicaManager;
import kafka.server.link.AbstractClusterLinkMetadataManagerTest;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetadataManagerWithZkSupport;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.Logging;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Set;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005\u0005}a\u0001B\f\u0019\u0001}AQ\u0001\n\u0001\u0005\u0002\u0015Bqa\n\u0001C\u0002\u0013%\u0001\u0006\u0003\u0004/\u0001\u0001\u0006I!\u000b\u0005\b_\u0001\u0011\r\u0011\"\u00031\u0011\u00199\u0004\u0001)A\u0005c!9\u0001\b\u0001b\u0001\n\u0013I\u0004B\u0002!\u0001A\u0003%!\bC\u0004B\u0001\t\u0007I\u0011\u0002\"\t\r\u001d\u0003\u0001\u0015!\u0003D\u0011\u001dA\u0005A1A\u0005\n%CaA\u0016\u0001!\u0002\u0013Q\u0005bB,\u0001\u0005\u0004%I\u0001\u0017\u0005\u0007?\u0002\u0001\u000b\u0011B-\t\u000b\u0001\u0004A\u0011A1\t\u000ba\u0004A\u0011A=\t\u000by\u0004A\u0011A=\t\r\u0005\u001d\u0001\u0001\"\u0001z\u0011\u0019\tY\u0001\u0001C\u0001s\"1\u0011q\u0002\u0001\u0005\u0002eDa!a\u0005\u0001\t\u0003I\bBBA\f\u0001\u0011\u0005\u0011\u0010\u0003\u0004\u0002\u001c\u0001!\t!\u001f\u0002,\u00072,8\u000f^3s\u0019&t7.T3uC\u0012\fG/Y'b]\u0006<WM],ji\"T6nU;qa>\u0014H\u000fV3ti*\u0011\u0011DG\u0001\u0005Y&t7N\u0003\u0002\u001c9\u000511/\u001a:wKJT\u0011!H\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0001\u0005\u0005\u0002\"E5\t\u0001$\u0003\u0002$1\t1\u0013IY:ue\u0006\u001cGo\u00117vgR,'\u000fT5oW6+G/\u00193bi\u0006l\u0015M\\1hKJ$Vm\u001d;\u0002\rqJg.\u001b;?)\u00051\u0003CA\u0011\u0001\u0003)\u0019wN\u001c;s_2dWM]\u000b\u0002SA\u0011!\u0006L\u0007\u0002W)\u0011q\u0005H\u0005\u0003[-\u0012qbS1gW\u0006\u001cuN\u001c;s_2dWM]\u0001\fG>tGO]8mY\u0016\u0014\b%A\u0007nKR\fG-\u0019;b\u0007\u0006\u001c\u0007.Z\u000b\u0002cA\u0011!'N\u0007\u0002g)\u0011AGG\u0001\t[\u0016$\u0018\rZ1uC&\u0011ag\r\u0002\u00105.lU\r^1eCR\f7)Y2iK\u0006qQ.\u001a;bI\u0006$\u0018mQ1dQ\u0016\u0004\u0013\u0001\u0003>l\u00072LWM\u001c;\u0016\u0003i\u0002\"a\u000f \u000e\u0003qR!!\u0010\u000f\u0002\u0005i\\\u0017BA =\u00055Y\u0015MZ6b5.\u001cE.[3oi\u0006I!p[\"mS\u0016tG\u000fI\u0001\u000fe\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s+\u0005\u0019\u0005C\u0001#F\u001b\u0005Q\u0012B\u0001$\u001b\u00059\u0011V\r\u001d7jG\u0006l\u0015M\\1hKJ\fqB]3qY&\u001c\u0017-T1oC\u001e,'\u000fI\u0001\u000bg\u0016\u0014h/\u001a:J]\u001a|W#\u0001&\u0011\u0005-#V\"\u0001'\u000b\u00055s\u0015AC1vi\"|'/\u001b>fe*\u00111d\u0014\u0006\u0003;AS!!\u0015*\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0019\u0016aA8sO&\u0011Q\u000b\u0014\u0002\u0015\u0003V$\bn\u001c:ju\u0016\u00148+\u001a:wKJLeNZ8\u0002\u0017M,'O^3s\u0013:4w\u000eI\u0001\u000fEJ|7.\u001a:F]\u0012\u0004x.\u001b8u+\u0005I\u0006C\u0001.^\u001b\u0005Y&B\u0001/P\u0003\u0019\u0019w.\\7p]&\u0011al\u0017\u0002\t\u000b:$\u0007o\\5oi\u0006y!M]8lKJ,e\u000e\u001a9pS:$\b%A\u0003tKR,\u0006\u000f\u0006\u0002cQB\u00111MZ\u0007\u0002I*\tQ-A\u0003tG\u0006d\u0017-\u0003\u0002hI\n!QK\\5u\u0011\u0015Ig\u00021\u0001k\u0003\u0011IgNZ8\u0011\u0005-\u0014X\"\u00017\u000b\u00055t\u0017aA1qS*\u0011q\u000e]\u0001\bUV\u0004\u0018\u000e^3s\u0015\t\t(+A\u0003kk:LG/\u0003\u0002tY\nAA+Z:u\u0013:4w\u000e\u000b\u0002\u000fkB\u00111N^\u0005\u0003o2\u0014!BQ3g_J,W)Y2i\u0003!!X-\u0019:E_^tG#\u00012)\u0005=Y\bCA6}\u0013\tiHNA\u0005BMR,'/R1dQ\u0006QC/Z:u\u001b\u0016$\u0018\rZ1uCR{\u0007/[2De\u0016\fG/[8o/&$\bNR1jY\u0016$\u0017\t\u001e;f[B$\bf\u0001\t\u0002\u0002A\u00191.a\u0001\n\u0007\u0005\u0015AN\u0001\u0003UKN$\u0018!\r;fgRlU\r^1eCR\fGk\u001c9jG\u000e\u0013X-\u0019;j_:<\u0016\u000e\u001e5U_BL7-\u0012=jgR\u001cX\t_2faRLwN\u001c\u0015\u0004#\u0005\u0005\u0011a\t;fgR\u0004\u0016M\u001d;ji&|g.\u00127fGRLwN\\!oIJ+7/[4oCRLwN\u001c\u0015\u0004%\u0005\u0005\u0011!\n;fgR<U\r\u001e+pa&\u001c7i\u001c8gS\u001e4\u0015\r\u001c7t\u0005\u0006\u001c7\u000eV8[W\u000ec\u0017.\u001a8uQ\r\u0019\u0012\u0011A\u0001%i\u0016\u001cHoR3u)>\u0004\u0018nY\"p]\u001aLw-V:fgJ+\u0007\u000f\\5dC6\u000bg.Y4fe\"\u001aA#!\u0001\u0002eQ,7\u000f\u001e'j].\u001cun\u001c:eS:\fGo\u001c:XQ\u0016tG*\u001b8l\u0007>|'\u000fZ5oCR|'/S:O_R,e.\u00192mK\u0012D3!FA\u0001\u0003=\"Xm\u001d;MS:\\7i\\8sI&t\u0017\r^8s/\",g\u000eT5oW\u000e{wN\u001d3j]\u0006$xN]%t\u000b:\f'\r\\3eQ\r1\u0012\u0011\u0001")
public class ClusterLinkMetadataManagerWithZkSupportTest
extends AbstractClusterLinkMetadataManagerTest {
    private final KafkaController controller = (KafkaController)Mockito.mock(KafkaController.class);
    private final ZkMetadataCache metadataCache = (ZkMetadataCache)Mockito.mock(ZkMetadataCache.class);
    private final KafkaZkClient zkClient = (KafkaZkClient)Mockito.mock(KafkaZkClient.class);
    private final ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
    private final AuthorizerServerInfo serverInfo = (AuthorizerServerInfo)Mockito.mock(AuthorizerServerInfo.class);
    private final Endpoint brokerEndpoint = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 1234);

    private KafkaController controller() {
        return this.controller;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private KafkaZkClient zkClient() {
        return this.zkClient;
    }

    private ReplicaManager replicaManager() {
        return this.replicaManager;
    }

    private AuthorizerServerInfo serverInfo() {
        return this.serverInfo;
    }

    private Endpoint brokerEndpoint() {
        return this.brokerEndpoint;
    }

    @BeforeEach
    public void setUp(TestInfo info) {
        this.scheduler().startup();
        Mockito.reset((Object[])new Object[]{this.destAdmin(), this.metadataCache(), this.zkClient(), this.replicaManager(), this.serverInfo()});
        Mockito.when((Object)this.serverInfo().interBrokerEndpoint()).thenReturn((Object)this.brokerEndpoint());
        Mockito.when((Object)this.zkClient().getChildren(ArgumentMatchers.anyString())).thenReturn((Object)package$.MODULE$.Seq().empty());
        Mockito.when((Object)this.zkClient().getClusterLinks((scala.collection.immutable.Set)ArgumentMatchers.any())).thenReturn((Object)Predef$.MODULE$.Map().empty());
        if (info.getDisplayName().startsWith("testMetadataTopicCreation")) {
            this.metadataManager_$eq((ClusterLinkMetadataManager)new ClusterLinkMetadataManagerWithZkSupport(this.brokerConfig(), this.scheduler(), this.metadataCache(), null, this.controller(), this.zkClient(), (Function0 & Serializable)() -> this.destAdmin(), this.replicaManager(), this.serverInfo(), (Option)None$.MODULE$));
            return;
        }
        Mockito.when((Object)this.metadataCache().numPartitions("_confluent-link-metadata")).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)50)));
        this.metadataManager_$eq((ClusterLinkMetadataManager)new ClusterLinkMetadataManagerWithZkSupport(this.brokerConfig(), this.scheduler(), this.metadataCache(), null, this.controller(), this.zkClient(), (Function0 & Serializable)() -> this.destAdmin(), this.replicaManager(), this.serverInfo(), (Option)None$.MODULE$));
        this.metadataManager().startup();
        this.waitAndCreateMetadataTopic(this.waitAndCreateMetadataTopic$default$1());
    }

    @AfterEach
    public void tearDown() {
        if (this.metadataManager() != null) {
            this.metadataManager().shutdown();
        }
        this.metadataManager_$eq(null);
        this.scheduler().shutdown();
    }

    @Test
    public void testMetadataTopicCreationWithFailedAttempt() {
        Mockito.reset((Object[])new Logging[]{this.metadataCache(), this.zkClient()});
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataCache().linkCoordinatorEnabled())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)this.metadataCache().numPartitions("_confluent-link-metadata")).thenReturn((Object)None$.MODULE$);
        Mockito.reset((Object[])new Admin[]{this.destAdmin()});
        Mockito.when((Object)this.destAdmin().createTopics((Collection)ArgumentMatchers.any())).thenReturn((Object)this.createMetadataTopicResult((Option<Throwable>)new Some((Object)new TopicAuthorizationException("")))).thenReturn((Object)this.createMetadataTopicResult((Option<Throwable>)None$.MODULE$));
        this.metadataManager().startup();
        this.waitAndCreateMetadataTopic(this.waitAndCreateMetadataTopic$default$1());
        ((Admin)Mockito.verify((Object)this.destAdmin(), (VerificationMode)Mockito.times((int)2))).createTopics((Collection)ArgumentMatchers.any());
        ((ZkMetadataCache)Mockito.verify((Object)this.metadataCache(), (VerificationMode)Mockito.times((int)2))).numPartitions("_confluent-link-metadata");
    }

    @Test
    public void testMetadataTopicCreationWithTopicExistsException() {
        Mockito.reset((Object[])new Logging[]{this.metadataCache(), this.zkClient()});
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataCache().linkCoordinatorEnabled())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)this.metadataCache().numPartitions("_confluent-link-metadata")).thenReturn((Object)None$.MODULE$, (Object[])new Option[]{new Some((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(this.brokerConfig().clusterLinkMetadataTopicPartitions())))});
        Mockito.reset((Object[])new Admin[]{this.destAdmin()});
        Mockito.when((Object)this.destAdmin().createTopics((Collection)ArgumentMatchers.any())).thenReturn((Object)this.createMetadataTopicResult((Option<Throwable>)new Some((Object)new TopicExistsException(""))));
        this.metadataManager().startup();
        this.waitAndCreateMetadataTopic(this.waitAndCreateMetadataTopic$default$1());
        ((Admin)Mockito.verify((Object)this.destAdmin())).createTopics((Collection)ArgumentMatchers.any());
        ((ZkMetadataCache)Mockito.verify((Object)this.metadataCache(), (VerificationMode)Mockito.times((int)2))).numPartitions("_confluent-link-metadata");
    }

    @Test
    public void testPartitionElectionAndResignation() {
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataCache().linkCoordinatorEnabled())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        String clusterLinkName = "testLink";
        int partition = (Utils.murmur2((byte[])clusterLinkName.getBytes()) & Integer.MAX_VALUE) % Predef$.MODULE$.Integer2int(this.brokerConfig().clusterLinkMetadataTopicPartitions());
        this.metadataManager().onElection(partition, 10);
        Assertions.assertTrue((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is not leader for cluster link");
        Assertions.assertFalse((boolean)this.metadataManager().isLinkCoordinator("testLink2"), (String)"Broker is leader for cluster link");
        this.metadataManager().onResignation(partition, (Option)new Some((Object)BoxesRunTime.boxToInteger((int)9)));
        Assertions.assertTrue((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is not leader for cluster link");
        this.metadataManager().onResignation(partition, (Option)new Some((Object)BoxesRunTime.boxToInteger((int)11)));
        Assertions.assertFalse((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is leader for cluster link");
        this.metadataManager().onElection(partition, 9);
        Assertions.assertFalse((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is leader for cluster link");
        this.metadataManager().onElection(partition, 12);
        Assertions.assertTrue((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is not leader for cluster link");
        this.metadataManager().onResignation(partition, (Option)None$.MODULE$);
        Assertions.assertFalse((boolean)this.metadataManager().isLinkCoordinator(clusterLinkName), (String)"Broker is leader for cluster link");
    }

    @Test
    public void testGetTopicConfigFallsBackToZkClient() {
        Mockito.reset((Object[])new Logging[]{this.zkClient(), this.replicaManager()});
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataCache().linkCoordinatorEnabled())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        String topic = "topic";
        Properties expected = new Properties();
        String messageTimestampTypeKey = "message.timestamp.type";
        String messageTimestampTypeValue = "CreateTime";
        expected.put(messageTimestampTypeKey, messageTimestampTypeValue);
        Mockito.when((Object)this.zkClient().getEntityConfigs("topics", topic)).thenReturn((Object)expected);
        Mockito.when((Object)this.replicaManager().getLogConfig(new TopicPartition(topic, 0))).thenReturn((Object)None$.MODULE$);
        Properties actual = this.metadataManager().getTopicConfig(topic);
        Assertions.assertEquals((int)1, (int)actual.size());
        Assertions.assertEquals((Object)messageTimestampTypeValue, (Object)actual.getProperty(messageTimestampTypeKey));
    }

    @Test
    public void testGetTopicConfigUsesReplicaManager() {
        Mockito.reset((Object[])new Logging[]{this.zkClient(), this.replicaManager()});
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataCache().linkCoordinatorEnabled())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        String topic = "topic";
        Properties expected = new Properties();
        String messageTimestampTypeKey = "message.timestamp.type";
        String messageTimestampTypeValue = "CreateTime";
        expected.put(messageTimestampTypeKey, messageTimestampTypeValue);
        Properties allProperties = new Properties();
        allProperties.putAll((Map<?, ?>)expected);
        allProperties.put("min.insync.replicas", "1");
        LogConfig logConfig = new LogConfig((Map)allProperties, CollectionConverters$.MODULE$.SetHasAsJava((Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{messageTimestampTypeKey}))).asJava());
        Mockito.when((Object)this.replicaManager().getLogConfig(new TopicPartition(topic, 0))).thenReturn((Object)new Some((Object)logConfig));
        Properties actual = this.metadataManager().getTopicConfig(topic);
        Assertions.assertEquals((int)1, (int)actual.size());
        Assertions.assertEquals((Object)messageTimestampTypeValue, (Object)actual.getProperty(messageTimestampTypeKey));
    }

    @Test
    public void testLinkCoordinatorWhenLinkCoordinatorIsNotEnabled() {
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataCache().linkCoordinatorEnabled())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        String linkName = "link-1";
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT);
        int controllerId = 1;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)this.metadataCache().getZkControllerId(false))).thenReturn((Object)BoxesRunTime.boxToInteger((int)controllerId));
        Some expectedNode = new Some((Object)new Node(controllerId, "localhost", 9092));
        Mockito.when((Object)this.metadataCache().getAliveBrokerNode(controllerId, listenerName)).thenReturn((Object)expectedNode);
        Option node = this.metadataManager().linkCoordinator(linkName, listenerName);
        Assertions.assertEquals((Object)expectedNode, (Object)node);
        ((ZkMetadataCache)Mockito.verify((Object)this.metadataCache(), (VerificationMode)Mockito.times((int)1))).getZkControllerId(false);
        ((ZkMetadataCache)Mockito.verify((Object)this.metadataCache(), (VerificationMode)Mockito.times((int)1))).getAliveBrokerNode(controllerId, listenerName);
    }

    @Test
    public void testLinkCoordinatorWhenLinkCoordinatorIsEnabled() {
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataCache().linkCoordinatorEnabled())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        String linkName = "link-1";
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT);
        int partition = (Utils.murmur2((byte[])linkName.getBytes()) & Integer.MAX_VALUE) % Predef$.MODULE$.Integer2int(this.brokerConfig().clusterLinkMetadataTopicPartitions());
        int linkCoordinatorId = 1;
        Some expectedNode = new Some((Object)new Node(linkCoordinatorId, "localhost", 9092));
        Mockito.when((Object)this.metadataCache().getPartitionLeaderEndpoint("_confluent-link-metadata", partition, listenerName)).thenReturn((Object)expectedNode);
        Option node = this.metadataManager().linkCoordinator(linkName, listenerName);
        Assertions.assertEquals((Object)expectedNode, (Object)node);
        ((ZkMetadataCache)Mockito.verify((Object)this.metadataCache(), (VerificationMode)Mockito.times((int)1))).getPartitionLeaderEndpoint("_confluent-link-metadata", partition, listenerName);
    }
}

