/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.link.integration;

import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.UserMetadata;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.CreateClusterLinkPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.TestInfoUtils;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateClusterLinksOptions;
import org.apache.kafka.clients.admin.CreateClusterLinksResult;
import org.apache.kafka.clients.admin.DeleteClusterLinksOptions;
import org.apache.kafka.clients.admin.DescribeMirrorsOptions;
import org.apache.kafka.clients.admin.ListClusterLinksOptions;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.clients.admin.NewMirrorTopic;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Option;

@Tag(value="integration")
public class MultiTenantClusterLinkTest {
    public static final String SSL_KAFKA_CN = "kafka";
    private MultiTenantCluster sourceCluster;
    private MultiTenantCluster destCluster;
    private final String linkName = "tenantLink";
    private final String topic = "linkedTopic";
    private int numPartitions = 2;
    private int nextMessageIndex = 0;
    private TestInfo testInfo;
    private boolean testAclSync = true;

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        Assumptions.assumeFalse((boolean)TestInfoUtils.isKRaft((TestInfo)testInfo), (String)"ClusterLinking and KRaft Integration tests are disabled for CP 7.2");
        this.sourceCluster = new MultiTenantCluster(testInfo);
        this.destCluster = new MultiTenantCluster(testInfo);
        this.testInfo = testInfo;
        this.testAclSync = !TestInfoUtils.isKRaft((TestInfo)testInfo);
    }

    @AfterEach
    public void tearDown() throws Exception {
        if (this.sourceCluster != null && this.destCluster != null) {
            this.sourceCluster.shutdown();
            this.destCluster.shutdown();
        }
    }

    private void setUpClusters(boolean useSourceInitiatedLink, boolean useSslForSourceListener) throws Exception {
        this.setUpClusters(useSourceInitiatedLink, useSslForSourceListener, 1000);
    }

    private void setUpClusters(boolean useSourceInitiatedLink, boolean useSslForSourceListener, int destAclLimit) throws Exception {
        this.sourceCluster.useSourceInitiatedLink = useSourceInitiatedLink;
        this.destCluster.useSourceInitiatedLink = useSourceInitiatedLink;
        this.sourceCluster.startCluster(this.brokerProps(), "sourceLogicalCluster", 1, useSslForSourceListener);
        Properties props = this.brokerProps();
        props.put("confluent.max.acls.per.tenant", String.valueOf(destAclLimit));
        this.destCluster.startCluster(props, "destLogicalCluster", 11, false);
        this.addAcls((Admin)this.destCluster.admin, this.destCluster.user);
        this.addAcls((Admin)this.sourceCluster.admin, this.sourceCluster.user);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testMultiTenantClusterLink(String quorum) throws Throwable {
        this.setUpClusters(false, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), this.testAclSync);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        UUID linkId = this.destCluster.linkId("tenantLink");
        this.createSourceTopic();
        this.createMirrorTopic(this.destCluster.admin, "");
        this.verifyTopicListing("");
        this.verifyTopicMirroring("");
        this.addSourcePartitionsAndVerifyMirror(4, "");
        this.changeSourceTopicConfigAndVerifyMirror("");
        this.verifyAclAndOffsetMigration("");
        this.verifyTopicMirroring("");
        this.verifyTenantMetrics(linkId, 1001, "");
        this.verifyMetricsGroups(linkId);
        this.stopMirroring(this.destCluster.admin, "linkedTopic");
        TestUtils.waitForCondition(() -> this.destCluster.mirrorState("linkedTopic") == MirrorTopicDescription.State.STOPPED, (String)"Mirror not stopped");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testMultiTenantClusterLinkWithClusterLinkPrefix(String quorum) throws Throwable {
        String clusterLinkPrefix = "src_";
        this.setUpClusters(false, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefix), false);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        UUID linkId = this.destCluster.linkId("tenantLink");
        this.createSourceTopic();
        this.createMirrorTopic(this.destCluster.admin, clusterLinkPrefix);
        this.verifyTopicListing(clusterLinkPrefix);
        this.verifyTopicMirroring(clusterLinkPrefix);
        this.addSourcePartitionsAndVerifyMirror(4, clusterLinkPrefix);
        this.changeSourceTopicConfigAndVerifyMirror(clusterLinkPrefix);
        this.verifyTopicMirroring(clusterLinkPrefix);
        this.verifyTenantMetrics(linkId, 1001, clusterLinkPrefix);
        this.verifyMetricsGroups(linkId);
        this.stopMirroring(this.destCluster.admin, clusterLinkPrefix + "linkedTopic");
        TestUtils.waitForCondition(() -> this.destCluster.mirrorState(clusterLinkPrefix + "linkedTopic") == MirrorTopicDescription.State.STOPPED, (String)"Mirror not stopped");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testMultiTenantClusterLinkNonUpdatableConfigPolicyViolationTest(String quorum) throws Throwable {
        this.setUpClusters(false, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), false);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.createSourceTopic();
        this.createMirrorTopic(this.destCluster.admin, "");
        this.verifyTopicListing("");
        this.verifyTopicMirroring("");
        final ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        HashMap<ConfigResource, Collection<AlterConfigOp>> alterConfigResource = new HashMap<ConfigResource, Collection<AlterConfigOp>>(){
            {
                this.put(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaFetchMaxBytesProp(), "5242882"), AlterConfigOp.OpType.SET)));
            }
        };
        AlterConfigsResult alterConfigsResult = this.destCluster.admin.incrementalAlterConfigs((Map)alterConfigResource);
        TestUtils.assertFutureThrows((Future)alterConfigsResult.all(), PolicyViolationException.class);
        this.stopMirroring(this.destCluster.admin, "linkedTopic");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft", "zk"})
    public void testClusterLinkSecurityUpdate(String quorum) throws Throwable {
        this.setUpClusters(false, false);
        if (TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            this.destCluster.createDestClusterLinkWithoutOverridingMetadataMaxAge(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), this.testAclSync);
        } else {
            this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), this.testAclSync);
        }
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.createSourceTopic();
        this.createMirrorTopic(this.destCluster.admin, "");
        this.verifyTopicListing("");
        this.verifyTopicMirroring("");
        this.verifyAclAndOffsetMigration("");
        LogicalClusterUser newUser = this.sourceCluster.createLinkUser(2001);
        String newJaasConfig = newUser.saslJaasConfig();
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", "sasl.jaas.config", newJaasConfig);
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = newUser;
        this.verifyTopicMirroring("");
        this.verifyAclAndOffsetMigration("");
        this.verifySocketBufferSizeUpdate();
        if (!TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            this.addSourcePartitionsAndVerifyMirror(4, "");
        }
        this.changeSourceTopicConfigAndVerifyMirror("");
        this.verifyTopicMirroring("");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testSourceInitiatedLink(String quorum) throws Throwable {
        this.setUpClusters(true, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1, Collections.emptyMap(), this.testAclSync);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003);
        MultiTenantClusterLinkTest.waitFor(() -> this.sourceCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.createSourceTopic();
        this.createMirrorTopic(this.destCluster.admin, "");
        this.verifyTopicListing("");
        this.verifyTopicMirroring("");
        this.verifyMetricsGroups(this.destCluster.linkId("tenantLink"));
        this.verifyTenantMetrics(this.destCluster.linkId("tenantLink"), 1003, "");
        this.verifyAclAndOffsetMigration("");
        LogicalClusterUser newUser = this.sourceCluster.createLinkUser(2001);
        String newJaasConfig = newUser.saslJaasConfig();
        this.sourceCluster.alterClusterLink(this.sourceCluster.admin, "tenantLink", ClusterLinkConfig.LocalPrefix() + "sasl.jaas.config", newJaasConfig);
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = newUser;
        this.verifyTopicMirroring("");
        this.verifyAclAndOffsetMigration("");
        this.verifySocketBufferSizeUpdate();
        this.addSourcePartitionsAndVerifyMirror(4, "");
        this.changeSourceTopicConfigAndVerifyMirror("");
        this.verifyTopicMirroring("");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft", "zk"})
    public void testMaxClusterLink(String quorum) throws Throwable {
        this.setUpClusters(false, false);
        KafkaPrincipal brokerPrincipal = new KafkaPrincipal("User", "broker");
        this.destCluster.addClusterAcls(brokerPrincipal, "All");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, 1001, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, 1002, Collections.emptyMap(), false);
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link1");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link2");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link3", this.sourceCluster, 1003, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link4", this.sourceCluster, 1004, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link5", this.sourceCluster, 1005, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link6", this.sourceCluster, 1006, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link7", this.sourceCluster, 1007, Collections.emptyMap(), false);
        CreateClusterLinksResult createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "link8", this.sourceCluster, 1008, "EXTERNAL", Collections.emptyMap(), true);
        String expectedErrorMsg = TestInfoUtils.isKRaft((TestInfo)this.testInfo) ? "This cluster already has the maximum number of DESTINATION cluster links (5). You can request a higher limit through Confluent Support." : "Unable to validate cluster link due to error: This cluster already has the maximum number of destination cluster links (5). You can request a higher limit through Confluent Support.";
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)expectedErrorMsg);
        if (!TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            this.changeMaxClusterLinks(6, -1);
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "link9", this.sourceCluster, 1009, "EXTERNAL", Collections.emptyMap(), true);
        }
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testTenantAclSyncLimit(String quorum) throws Throwable {
        int destAclLimit = 30;
        this.setUpClusters(false, false, destAclLimit);
        int initialAcls = 4;
        this.waitForAcls(this.sourceCluster, initialAcls, null);
        this.waitForAcls(this.destCluster, initialAcls, null);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, 1001, Collections.emptyMap());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, 1002, Collections.emptyMap());
        Uuid linkId1 = Utils.toKafkaUuid((UUID)this.destCluster.linkId("link1"));
        Uuid linkId2 = Utils.toKafkaUuid((UUID)this.destCluster.linkId("link2"));
        int sourceAcls = 12;
        this.waitForAcls(this.sourceCluster, sourceAcls, null);
        this.waitForAcls(this.destCluster, sourceAcls + initialAcls, null);
        this.waitForAcls(this.destCluster, sourceAcls, linkId1);
        this.waitForAcls(this.destCluster, sourceAcls, linkId2);
        TestUtils.waitForCondition(() -> this.aclAddFailedMetric() == 0.0, (String)("ACL update failed before reaching limit " + this.aclAddFailedMetric()));
        while (sourceAcls + initialAcls < destAclLimit - 2) {
            this.addSourceAcl("topic" + sourceAcls++);
            this.waitForAcls(this.sourceCluster, sourceAcls, null);
            this.waitForAcls(this.destCluster, sourceAcls + initialAcls, null);
            this.waitForAcls(this.destCluster, sourceAcls, linkId1);
            this.waitForAcls(this.destCluster, sourceAcls, linkId2);
        }
        for (int i = 0; i < 5; ++i) {
            this.addSourceAcl("topic" + sourceAcls++);
        }
        this.waitForAcls(this.sourceCluster, sourceAcls, null);
        TestUtils.waitForCondition(() -> this.aclAddFailedMetric() > 0.0, (String)"ACL failed metric not updated");
        Assertions.assertTrue((this.aclCount(this.destCluster, null) <= destAclLimit ? 1 : 0) != 0, (String)("Too many dest ACLs " + this.aclCount(this.destCluster, null)));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testMaxSourceInitiatedClusterLink(String quorum) throws Throwable {
        this.setUpClusters(true, false);
        KafkaPrincipal brokerPrincipal = new KafkaPrincipal("User", "broker");
        this.destCluster.addClusterAcls(brokerPrincipal, "All");
        this.sourceCluster.addClusterAcls(brokerPrincipal, "All");
        this.changeMaxClusterLinks(10, 2);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link1", this.destCluster, 2001);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link2", this.destCluster, 2002);
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link1");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link2");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link3", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link3", this.destCluster, 2003);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link4", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link4", this.destCluster, 2004);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link5", this.sourceCluster, -1, Collections.emptyMap());
        CreateClusterLinksResult createClusterLinksResult = this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, "link5", this.destCluster, 2005);
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)String.format("Unable to validate cluster link due to error: This cluster already has the maximum number of source cluster links (%d). You can request a higher limit through Confluent Support.", 2));
        this.changeMaxClusterLinks(-1, 3);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link6", this.sourceCluster, -1, Collections.emptyMap());
        this.destCluster.createSourceClusterLink(this.sourceCluster.admin, "link6", this.destCluster, 2026);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft", "zk"})
    public void testSslClusterLink(String quorum) throws Throwable {
        this.setUpClusters(false, true);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "INTERNAL", Collections.emptyMap(), true).all().get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.destCluster.linkId("tenantLink");
        NewTopic sourceTopic = new NewTopic("linkedTopic", Optional.empty(), Optional.of((short)1));
        this.sourceCluster.admin.createTopics(Collections.singleton(sourceTopic));
        MultiTenantClusterLinkTest.waitFor(() -> {
            try {
                return ((Map)this.sourceCluster.admin.listTopics().namesToListings().get()).containsKey("linkedTopic");
            }
            catch (Exception e) {
                return false;
            }
        }, true, "Failed to list topic");
        this.createMirrorTopic(this.destCluster.admin, "");
        int messageCount = 10;
        Properties producerProps = KafkaTestUtils.producerProps(this.sourceCluster.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.SSL, "", "");
        producerProps.putAll((Map<?, ?>)this.sourceCluster.sslConfigs);
        try (KafkaProducer sourceProducer = new KafkaProducer(producerProps);){
            KafkaTestUtils.sendRecords((KafkaProducer<String, String>)sourceProducer, "linkedTopic", 0, messageCount);
            KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), "linkedTopic", 0, messageCount);
        }
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testInternalIpAndPort(String quorum) throws Throwable {
        this.setUpClusters(false, false);
        CreateClusterLinksResult createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.singletonMap("bootstrap.servers", "10.0.0.3:9071"), false);
        String errMsgFmt = TestInfoUtils.isKRaft((TestInfo)this.testInfo) ? "Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [%s/%s:9071]" : "Unable to validate cluster link due to error: Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [%s/%s:9071]";
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)String.format(errMsgFmt, "", "10.0.0.3"));
        createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1002, "EXTERNAL", Collections.singletonMap("bootstrap.servers", "127.0.0.1:9071"), false);
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)String.format(errMsgFmt, "", "127.0.0.1"));
        createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1003, "EXTERNAL", Collections.singletonMap("bootstrap.servers", "localhost:9071"), false);
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)String.format(errMsgFmt, "localhost", "127.0.0.1"));
        createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1004, "EXTERNAL", Collections.singletonMap("bootstrap.servers", " 11.0.0.3:9071 , 10.0.0.3:9071,10.0.0.4:9071"), false);
        String errMsgTwo = TestInfoUtils.isKRaft((TestInfo)this.testInfo) ? "Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [/10.0.0.3:9071, /10.0.0.4:9071]" : "Unable to validate cluster link due to error: Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [/10.0.0.3:9071, /10.0.0.4:9071]";
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)errMsgTwo);
        this.destCluster.createDestClusterLinkWithoutOverridingMetadataMaxAge(this.destCluster.admin, "tenantLink", this.sourceCluster, 1005, Collections.emptyMap(), false);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        AlterConfigsResult alterConfigsResult = this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry("bootstrap.servers", "10.0.0.3:9071"), AlterConfigOp.OpType.SET))));
        TestUtils.assertFutureThrows((Future)alterConfigsResult.all(), PolicyViolationException.class, (String)"Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [/10.0.0.3:9071]");
    }

    private Properties brokerProps() {
        Properties props = new Properties();
        props.put("confluent.cluster.link.enable", "true");
        props.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        props.put("confluent.plugins.topic.policy.replication.factor", "1");
        props.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        props.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        props.put(KafkaConfig.PasswordEncoderSecretProp(), "multi-tenant-cluster-link-secret");
        props.put(KafkaConfig.CreateTopicPolicyClassNameProp(), CreateTopicPolicy.class.getName());
        props.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        props.put(KafkaConfig.CreateClusterLinkPolicyClassNameProp(), CreateClusterLinkPolicy.class.getName());
        if (TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            props.put("confluent.cluster.link.metadata.topic.enable", "true");
            props.put("confluent.cluster.link.metadata.topic.replication.factor", "1");
            props.put("confluent.cluster.link.metadata.topic.partitions", "3");
        }
        return props;
    }

    private void createSourceTopic() {
        this.sourceCluster.physicalCluster.kafkaCluster().createTopic(this.sourceCluster.user.tenantPrefix() + "linkedTopic", this.numPartitions, 1);
    }

    private void createMirrorTopic(ConfluentAdmin admin, String clusterLinkPrefix) throws Exception {
        NewTopic newTopic = new NewTopic(clusterLinkPrefix + "linkedTopic", Optional.empty(), Optional.of((short)1)).mirror(Optional.of(new NewMirrorTopic("tenantLink", "linkedTopic")));
        admin.createTopics(Collections.singleton(newTopic)).all().get();
    }

    private void stopMirroring(ConfluentAdmin admin, String topic) throws Throwable {
        admin.alterMirrors(Collections.singletonMap(topic, AlterMirrorOp.FAILOVER), new AlterMirrorsOptions()).all().get(15L, TimeUnit.SECONDS);
    }

    private void verifyTopicListing(String clusterLinkPrefix) throws Exception {
        ListClusterLinksOptions options = new ListClusterLinksOptions().includeTopics(true);
        Collection listings = (Collection)this.destCluster.admin.listClusterLinks(options).result().get();
        Assertions.assertEquals((int)1, (int)listings.size());
        ClusterLinkListing listing = (ClusterLinkListing)listings.iterator().next();
        Assertions.assertEquals((Object)"tenantLink", (Object)listing.linkName());
        Assertions.assertTrue((boolean)listing.topics().isPresent());
        Collection topics = (Collection)listing.topics().get();
        Assertions.assertEquals((int)1, (int)topics.size());
        Assertions.assertEquals((Object)(clusterLinkPrefix + "linkedTopic"), topics.iterator().next());
    }

    private void verifyTopicMirroring(String clusterLinkPrefix) throws Throwable {
        int firstMessageIndex = this.nextMessageIndex;
        int messageCount = 10;
        this.nextMessageIndex += messageCount;
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(), "linkedTopic", firstMessageIndex, messageCount);
        KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), clusterLinkPrefix + "linkedTopic", firstMessageIndex, messageCount);
    }

    private void verifyAclAndOffsetMigration(String clusterLinkPrefix) throws Throwable {
        Set<AclBinding> acls = this.addAcls((Admin)this.sourceCluster.admin, this.sourceCluster.user);
        this.addBrokerAclsForOffsetMigration(clusterLinkPrefix);
        String group = "linkedGroup";
        Map<TopicPartition, OffsetAndMetadata> offsets = this.commitOffsets((Admin)this.sourceCluster.admin, group);
        if (!clusterLinkPrefix.isEmpty()) {
            offsets = offsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(clusterLinkPrefix + ((TopicPartition)e.getKey()).topic(), ((TopicPartition)e.getKey()).partition()), e -> (OffsetAndMetadata)e.getValue()));
        }
        UUID linkId = this.destCluster.linkId("tenantLink");
        Uuid clusterLinkId = new Uuid(linkId.getMostSignificantBits(), linkId.getLeastSignificantBits());
        Set linkAcls = acls.stream().map(acl -> SecurityUtils.aclWithClusterLinkIds((AclBinding)acl, Collections.singleton(clusterLinkId))).collect(Collectors.toSet());
        if (this.testAclSync) {
            MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.describeAcls(this.sourceCluster.user), linkAcls, "Acls not migrated");
        }
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.committedOffsets(group), offsets, "Consumer offsets not migrated");
    }

    private Map<String, String> linkMetricTags(String tenant, String request) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("tenant", tenant);
        tags.put("link-name", "tenantLink");
        tags.put("request", request);
        return tags;
    }

    private Map<String, String> linkIdMetricTags(UUID linkId, String tenant, String request) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("tenant", tenant);
        tags.put("link-id", Utils.toKafkaUuid((UUID)linkId).toString());
        tags.put("request", request);
        return tags;
    }

    private void verifyTenantMetrics(UUID linkId, int linkUserId, String clusterLinkPrefix) {
        String destMetricsGroup = "cluster-link-dest-tenant-metrics";
        HashMap<String, String> sourceTags = new HashMap<String, String>();
        sourceTags.put("tenant", "sourceLogicalCluster");
        sourceTags.put("user", String.valueOf(linkUserId));
        sourceTags.put("request", "Fetch");
        Assertions.assertFalse((boolean)this.metricsFound(this.sourceCluster, "tenant-metrics", sourceTags));
        Map<String, String> destTags = this.linkMetricTags("destLogicalCluster", "Fetch");
        double responseTimeNs = this.metricValue(this.destCluster, destMetricsGroup, "response-time-ns-max", destTags);
        Assertions.assertTrue((responseTimeNs > 0.0 && responseTimeNs < (double)TimeUnit.SECONDS.toNanos(15L) ? 1 : 0) != 0, (String)("Invalid response time metric: " + responseTimeNs));
        HashMap<String, String> linkTags = new HashMap<String, String>();
        linkTags.put("tenant", "destLogicalCluster");
        linkTags.put("link-name", "tenantLink");
        Assertions.assertEquals((double)1.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "link-count", linkTags), (double)0.001);
        linkTags.put("state", "Mirror");
        Assertions.assertEquals((double)1.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.put("state", "FailedMirror");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.put("state", "PausedMirror");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.put("state", "PendingStoppedMirror");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.put("state", "StoppedMirror");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.remove("state");
        Assertions.assertEquals((double)this.numPartitions, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-partition-count", linkTags), (double)0.001);
        linkTags.put("topic", clusterLinkPrefix + "linkedTopic");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-lag", linkTags), (double)0.0);
        Assertions.assertTrue((this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-byte-total", linkTags) > 0.0 ? 1 : 0) != 0, (String)"Invalid mirror topic throughput");
        linkTags.remove("topic");
        String sourceMetricsGroup = "cluster-link-source-metrics";
        Map<String, String> sourceLinkTags = this.linkIdMetricTags(linkId, "sourceLogicalCluster", "Fetch");
        this.assertRange("requests", this.metricValue(this.sourceCluster, sourceMetricsGroup, "request-total", sourceLinkTags), this.metricValue(this.destCluster, destMetricsGroup, "request-total", destTags), 10.0);
        this.assertRange("request-bytes", this.metricValue(this.sourceCluster, sourceMetricsGroup, "request-byte-total", sourceLinkTags), this.metricValue(this.destCluster, destMetricsGroup, "request-byte-total", destTags), 2000.0);
        this.assertRange("response-bytes", this.metricValue(this.destCluster, destMetricsGroup, "response-byte-total", destTags), this.metricValue(this.sourceCluster, sourceMetricsGroup, "response-byte-total", sourceLinkTags), 2000.0);
        responseTimeNs = this.metricValue(this.sourceCluster, sourceMetricsGroup, "response-time-ns-max", sourceLinkTags);
        Assertions.assertTrue((responseTimeNs > 0.0 && responseTimeNs < (double)TimeUnit.SECONDS.toNanos(15L) ? 1 : 0) != 0, (String)("Invalid source link response time metric: " + responseTimeNs));
    }

    private void verifyMetricsGroups(UUID linkId) {
        double sourceValue;
        Map<String, String> destTags = this.linkMetricTags("destLogicalCluster", "Metadata");
        Map<String, String> sourceTags = this.linkMetricTags("sourceLogicalCluster", "Metadata");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.sourceCluster, "cluster-link-dest-tenant-metrics", "request-total", sourceTags, false), (double)0.001);
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-source-tenant-metrics", "request-total", destTags, false), (double)0.001);
        double destValue = this.metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "request-total", destTags);
        Assertions.assertTrue((destValue > 0.0 ? 1 : 0) != 0, (String)("Dest metric not updated: " + destValue));
        if (this.sourceCluster.useSourceInitiatedLink) {
            sourceValue = this.metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", "request-total", sourceTags);
            Assertions.assertTrue((sourceValue > 0.0 ? 1 : 0) != 0, (String)("Source metric not updated: " + sourceValue));
        } else {
            Assertions.assertEquals((double)0.0, (double)this.metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", "request-total", sourceTags, false), (double)0.001);
        }
        destTags = this.linkIdMetricTags(linkId, "destLogicalCluster", "Metadata");
        sourceTags = this.linkIdMetricTags(linkId, "sourceLogicalCluster", "Metadata");
        sourceValue = this.metricValue(this.sourceCluster, "cluster-link-source-metrics", "request-total", sourceTags);
        Assertions.assertTrue((sourceValue > 0.0 ? 1 : 0) != 0, (String)("Source metric not updated: " + sourceValue));
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-source-metrics", "request-total", destTags, false), (double)0.001);
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("link-id", linkId.toString());
        tags.put("tenant", "sourceLogicalCluster");
        tags.put("mode", "source");
        int numSourceBrokers = this.sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().size();
        Assertions.assertEquals((int)numSourceBrokers, (int)((int)this.totalMetricValue(this.sourceCluster, "cluster-link-metrics", "active-link-count", tags)));
        tags.put("tenant", "destLogicalCluster");
        tags.put("mode", "destination");
        int numDestBrokers = this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers().size();
        Assertions.assertEquals((int)numDestBrokers, (int)((int)this.totalMetricValue(this.destCluster, "cluster-link-metrics", "active-link-count", tags)));
    }

    private void verifySocketBufferSizeUpdate() throws Exception {
        long invalidSize = 0x200000L;
        ConfigResource linkResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        ExecutionException e1 = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(linkResource, Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(invalidSize)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        });
        Assertions.assertEquals(PolicyViolationException.class, e1.getCause().getClass());
        ExecutionException e2 = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(linkResource, Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(1024)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        });
        Assertions.assertEquals(PolicyViolationException.class, e2.getCause().getClass());
        long validSize = 131072L;
        this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(validSize)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkConfig("tenantLink", KafkaConfig.ReplicaSocketReceiveBufferBytesProp()), String.valueOf(validSize), "Link config not updated");
    }

    private void assertRange(String name, double receiverValue, double senderValue, double maxDiff) {
        String message = String.format("Metric values for '%s' (%f, %f) not within expected range %f", name, receiverValue, senderValue, maxDiff);
        Assertions.assertTrue((senderValue - receiverValue <= maxDiff && receiverValue - senderValue < 0.001 ? 1 : 0) != 0, (String)message);
    }

    private boolean metricsFound(MultiTenantCluster cluster, String group, Map<String, String> tags) {
        for (KafkaBroker kafkaBroker : cluster.physicalCluster.kafkaCluster().kafkaBrokers()) {
            for (Map.Entry entry : kafkaBroker.metrics().metrics().entrySet()) {
                MetricName metricName = (MetricName)entry.getKey();
                if (!metricName.group().equals(group) || !this.tagsMatched(metricName, tags)) continue;
                return true;
            }
        }
        return false;
    }

    private double metricValue(MultiTenantCluster cluster, String group, String name, Map<String, String> tags) {
        return this.metricValue(cluster, group, name, tags, true);
    }

    private double metricValue(MultiTenantCluster cluster, String group, String name, Map<String, String> tags, boolean failIfNotFound) {
        double value = 0.0;
        boolean found = false;
        for (KafkaBroker kafkaBroker : cluster.physicalCluster.kafkaCluster().kafkaBrokers()) {
            for (Map.Entry entry : kafkaBroker.metrics().metrics().entrySet()) {
                if (!this.isMatchingMetric((MetricName)entry.getKey(), name, group, tags)) continue;
                found = true;
                value += ((Double)((KafkaMetric)entry.getValue()).metricValue()).doubleValue();
            }
        }
        if (failIfNotFound) {
            Assertions.assertTrue((boolean)found, (String)("Metric not found " + name));
        }
        return value;
    }

    private double totalMetricValue(MultiTenantCluster cluster, String group, String name, Map<String, String> tags) {
        double value = 0.0;
        for (KafkaBroker kafkaBroker : cluster.physicalCluster.kafkaCluster().kafkaBrokers()) {
            for (Map.Entry entry : kafkaBroker.metrics().metrics().entrySet()) {
                if (!this.isMatchingMetric((MetricName)entry.getKey(), name, group, tags)) continue;
                value += ((Double)((KafkaMetric)entry.getValue()).metricValue()).doubleValue();
            }
        }
        return value;
    }

    private boolean isMatchingMetric(MetricName metricName, String name, String group, Map<String, String> tags) {
        if (!metricName.name().equals(name)) {
            return false;
        }
        if (!metricName.group().equals(group)) {
            return false;
        }
        return this.tagsMatched(metricName, tags);
    }

    private boolean tagsMatched(MetricName metricName, Map<String, String> tags) {
        Map metricTags = metricName.tags();
        for (Map.Entry<String, String> entry : tags.entrySet()) {
            if (entry.getValue().equals(metricTags.get(entry.getKey()))) continue;
            return false;
        }
        return true;
    }

    private void addSourcePartitionsAndVerifyMirror(int newPartitionCount, String clusterLinkPrefix) throws Exception {
        ((KafkaFuture)this.sourceCluster.admin.createPartitions(Collections.singletonMap("linkedTopic", NewPartitions.increaseTo((int)newPartitionCount))).values().get("linkedTopic")).get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.partitionsForTopic(clusterLinkPrefix + "linkedTopic"), newPartitionCount, "Topic partitions not updated");
        this.numPartitions = newPartitionCount;
    }

    private void changeSourceTopicConfigAndVerifyMirror(String clusterLinkPrefix) throws Exception {
        this.sourceCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "linkedTopic"), Collections.singleton(new AlterConfigOp(new ConfigEntry("max.message.bytes", "123456"), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.topicConfig(clusterLinkPrefix + "linkedTopic", "max.message.bytes"), "123456", "Topic configs not migrated");
    }

    private void changeMaxClusterLinks(int maxDestClusterLinks, int maxSourceClusterLinks) throws Exception {
        if (maxDestClusterLinks >= 0) {
            this.destCluster.internalAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.cluster.link.policy.max.destination.links.per.tenant", String.valueOf(maxDestClusterLinks)), AlterConfigOp.OpType.SET)))).all().get();
        }
        if (maxSourceClusterLinks >= 0) {
            this.sourceCluster.internalAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.cluster.link.policy.max.source.links.per.tenant", String.valueOf(maxSourceClusterLinks)), AlterConfigOp.OpType.SET)))).all().get();
        }
    }

    private Set<AclBinding> addAcls(Admin adminClient, LogicalClusterUser user) throws Exception {
        String principal = user.unprefixedKafkaPrincipal().toString();
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding topicAclWithPrefix = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "src_linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        AclBinding groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
        Set acls = Utils.mkSet((Object[])new AclBinding[]{topicAcl, groupAcl, clusterAcl, topicAclWithPrefix});
        adminClient.createAcls((Collection)acls).all().get(15L, TimeUnit.SECONDS);
        return acls;
    }

    private void addSourceAcl(String topic) throws Exception {
        String principal = this.sourceCluster.user.unprefixedKafkaPrincipal().toString();
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic, PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        this.sourceCluster.admin.createAcls(Collections.singleton(topicAcl)).all().get(15L, TimeUnit.SECONDS);
    }

    private void waitForAcls(MultiTenantCluster cluster, int count, Uuid linkId) throws Exception {
        String clusterType = cluster == this.sourceCluster ? "Source" : "Destination";
        TestUtils.waitForCondition(() -> this.aclCount(cluster, linkId) == count, () -> String.format("%s acls not created, expected %d got %d", clusterType, count, this.aclCount(cluster, linkId)));
    }

    private int aclCount(MultiTenantCluster cluster, Uuid linkId) {
        try {
            AclBindingFilter filter = AclBindingFilter.ANY;
            if (linkId != null) {
                filter = SecurityUtils.aclFilterWithClusterLinkIds((AclBindingFilter)filter, Collections.singleton(linkId));
            }
            return ((Collection)cluster.admin.describeAcls(filter).values().get(15L, TimeUnit.SECONDS)).size();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private double aclAddFailedMetric() {
        return this.totalMetricValue(this.destCluster, "cluster-link-metrics", "acls-add-failed-total", Collections.emptyMap());
    }

    private void addBrokerAclsForOffsetMigration(String clusterLinkPrefix) {
        String withLinkPrefix = this.destCluster.user.tenantPrefix() + clusterLinkPrefix + "linked";
        this.destCluster.physicalCluster.newAclCommand().consumeAclArgs(PhysicalCluster.BROKER_PRINCIPAL, withLinkPrefix, withLinkPrefix, PatternType.PREFIXED).execute();
    }

    private Map<TopicPartition, OffsetAndMetadata> commitOffsets(Admin adminClient, String group) throws Exception {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        LogManager logManager = this.sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().get(0).logManager();
        for (int i = 0; i < this.numPartitions; ++i) {
            TopicPartition tp = new TopicPartition("linkedTopic", i);
            TopicPartition prefixedPartition = new TopicPartition(this.sourceCluster.user.tenantPrefix() + "linkedTopic", i);
            offsets.put(tp, new OffsetAndMetadata(((AbstractLog)logManager.getLog(prefixedPartition, false).get()).localLogEndOffset()));
        }
        adminClient.alterConsumerGroupOffsets(group, offsets).all().get();
        return offsets;
    }

    private static <T> void waitFor(Supplier<T> actual, T expected, String error) throws Exception {
        TestUtils.waitForCondition(() -> expected.equals(actual.get()), () -> error + " : expected=" + expected + ", actual=" + actual.get());
    }

    private static class MultiTenantCluster
    extends IntegrationTestHarness {
        private final Map<String, String> sslConfigs = new HashMap<String, String>();
        private PhysicalCluster physicalCluster;
        private LogicalCluster logicalCluster;
        private LogicalClusterUser user;
        private LogicalClusterUser linkUser;
        private ConfluentAdmin admin;
        private boolean useSourceInitiatedLink;
        KafkaProducer<String, String> producer;
        KafkaConsumer<String, String> consumer;

        public MultiTenantCluster(TestInfo testInfo) {
            super(testInfo);
        }

        void startCluster(Properties brokerOverrideProps, String logicalClusterId, int userId, boolean useSslForInternalListener) throws Exception {
            if (useSslForInternalListener) {
                Properties allBrokerOverrideProps = new Properties();
                this.createSslStores();
                allBrokerOverrideProps.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:SSL,EXTERNAL:SASL_PLAINTEXT");
                allBrokerOverrideProps.putAll(this.sslConfigs);
                allBrokerOverrideProps.putAll((Map<?, ?>)brokerOverrideProps);
                this.physicalCluster = this.start(allBrokerOverrideProps, brokerOverrideProps, true, Collections.singleton(String.format("User:O=A server,CN=%s", MultiTenantClusterLinkTest.SSL_KAFKA_CN)), Optional.of(Time.SYSTEM), cluster -> {});
            } else {
                this.physicalCluster = this.start(brokerOverrideProps, brokerOverrideProps, true, Optional.of(Time.SYSTEM), cluster -> {});
            }
            this.logicalCluster = this.physicalCluster.createLogicalCluster(logicalClusterId, 100, userId);
            this.user = this.logicalCluster.user(userId);
            if (useSslForInternalListener) {
                Properties clientProps = new Properties();
                for (Map.Entry<String, String> kv : this.clientConfigs("INTERNAL").entrySet()) {
                    clientProps.put(kv.getKey(), kv.getValue());
                }
                this.admin = (ConfluentAdmin)super.createAdminClient(this.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.SSL, null, null, clientProps);
            } else {
                this.admin = (ConfluentAdmin)super.createAdminClient(this.logicalCluster.adminUser());
            }
        }

        Admin internalAdminClient() {
            return this.physicalCluster.superAdminClient();
        }

        LogicalClusterUser createLinkUser(int newUserId) throws Exception {
            UserMetadata newUser = this.physicalCluster.getOrCreateUser(newUserId, false);
            LogicalClusterUser linkUser = this.logicalCluster.addUser(newUser);
            this.addLinkAcls(linkUser);
            if (this.useSourceInitiatedLink) {
                this.addReverseConnectionAcls(linkUser);
            }
            return linkUser;
        }

        private void createSslStores() throws Exception {
            CertStores certStores = new CertStores.Builder(true).cn(MultiTenantClusterLinkTest.SSL_KAFKA_CN).addHostName("localhost").build();
            Properties props = new Properties();
            BiConsumer<String, Object> copyEntry = (k, v) -> {
                if (v instanceof Password) {
                    props.setProperty((String)k, ((Password)v).value());
                } else if (v instanceof List) {
                    List listOfString = (List)v;
                    props.setProperty((String)k, String.join((CharSequence)",", listOfString));
                } else if (v != null) {
                    props.setProperty((String)k, (String)v);
                }
            };
            certStores.keyStoreProps().forEach(copyEntry);
            certStores.trustStoreProps().forEach(copyEntry);
            TestSslUtils.convertToPemWithoutFiles((Properties)props);
            props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> this.sslConfigs.put((String)k, (String)v)));
            this.sslConfigs.put("ssl.client.auth", "required");
        }

        private boolean usesSslForInternalListener() {
            return !this.sslConfigs.isEmpty();
        }

        void deleteUser(LogicalClusterUser user, boolean deleteAcls) {
            this.logicalCluster.removeUser(user.userMetadata.userId());
            if (deleteAcls) {
                this.deleteAcls(user);
            }
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
            }
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
        }

        private Map<String, String> clientConfigs(String listenerName) {
            Properties props = listenerName.equals("EXTERNAL") ? KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(listenerName), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig()) : (this.usesSslForInternalListener() ? KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(listenerName), SecurityProtocol.SSL, null, null) : KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(listenerName), SecurityProtocol.PLAINTEXT, "", ""));
            HashMap<String, String> clientConfigs = new HashMap<String, String>();
            props.stringPropertyNames().forEach(name -> clientConfigs.put((String)name, props.getProperty((String)name)));
            clientConfigs.putAll(this.sslConfigs);
            return clientConfigs;
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, String sourceListener, Map<String, String> overrides, boolean validateLink) throws Throwable {
            return this.createDestClusterLinkResult(admin, linkName, sourceCluster, linkUserId, sourceListener, overrides, validateLink, true);
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, String sourceListener, Map<String, String> overrides, boolean validateLink, boolean enableAclSync) throws Throwable {
            HashMap<String, String> linkConfigs = new HashMap<String, String>();
            if (!this.useSourceInitiatedLink) {
                sourceCluster.linkUser = sourceCluster.createLinkUser(linkUserId);
                linkConfigs.putAll(sourceCluster.clientConfigs(sourceListener));
            } else {
                linkConfigs.put(ClusterLinkConfig.LinkModeProp(), "DESTINATION");
                linkConfigs.put(ClusterLinkConfig.ConnectionModeProp(), "INBOUND");
            }
            linkConfigs.put("request.timeout.ms", "10000");
            linkConfigs.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            if (enableAclSync) {
                String allAclsFilter = "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\" }}]}";
                linkConfigs.put(ClusterLinkConfig.AclFiltersProp(), allAclsFilter);
                linkConfigs.put(ClusterLinkConfig.AclSyncEnableProp(), "true");
                linkConfigs.put(ClusterLinkConfig.AclSyncMsProp(), "2000");
            }
            String allGroupsFilter = "{ \"groupFilters\": [{ \"name\": \"*\", \"patternType\": \"literal\", \"filterType\": \"include\" }]}";
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), allGroupsFilter);
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "2000");
            String sourceClusterId = sourceListener.equals("EXTERNAL") ? sourceCluster.logicalCluster.logicalClusterId() : sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().get(0).clusterId();
            linkConfigs.putAll(overrides);
            NewClusterLink newClusterLink = new NewClusterLink(linkName, sourceClusterId, linkConfigs);
            CreateClusterLinksOptions options = new CreateClusterLinksOptions().validateOnly(false).validateLink(validateLink);
            return admin.createClusterLinks(Collections.singleton(newClusterLink), options);
        }

        void createDestClusterLinkWithoutOverridingMetadataMaxAge(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, Map<String, String> overrides, Boolean enableSyncAcl) throws Throwable {
            this.createDestClusterLinkResult(admin, linkName, sourceCluster, linkUserId, "EXTERNAL", overrides, true, enableSyncAcl).all().get();
        }

        void createDestClusterLink(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, Map<String, String> overrides) throws Throwable {
            this.createDestClusterLink(admin, linkName, sourceCluster, linkUserId, overrides, true);
        }

        void createDestClusterLink(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, Map<String, String> overrides, Boolean enableSyncAcl) throws Throwable {
            this.createDestClusterLinkResult(admin, linkName, sourceCluster, linkUserId, "EXTERNAL", overrides, true, enableSyncAcl).all().get();
            this.setInternalClusterLinkConfigs(linkName, Collections.singletonMap("metadata.max.age.ms", "1000"));
        }

        CreateClusterLinksResult createSourceClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster destCluster, int linkUserId) throws Throwable {
            Assertions.assertTrue((boolean)this.useSourceInitiatedLink);
            destCluster.linkUser = destCluster.createLinkUser(linkUserId + 1000);
            this.linkUser = this.createLinkUser(linkUserId);
            Properties destClientProps = KafkaTestUtils.securityProps(destCluster.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), destCluster.linkUser.saslJaasConfig());
            Properties sourceClientProps = KafkaTestUtils.securityProps(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig());
            HashMap<String, String> linkConfigs = new HashMap<String, String>();
            linkConfigs.put(ClusterLinkConfig.LinkModeProp(), "SOURCE");
            linkConfigs.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
            linkConfigs.put(ClusterLinkConfig.LocalListenerNameProp(), "EXTERNAL");
            destClientProps.stringPropertyNames().forEach(name -> linkConfigs.put((String)name, destClientProps.getProperty((String)name)));
            sourceClientProps.stringPropertyNames().forEach(name -> linkConfigs.put(ClusterLinkConfig.LocalPrefix() + name, sourceClientProps.getProperty((String)name)));
            linkConfigs.put("request.timeout.ms", "10000");
            linkConfigs.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            NewClusterLink newClusterLink = new NewClusterLink(linkName, destCluster.logicalCluster.logicalClusterId(), linkConfigs);
            CreateClusterLinksOptions options = new CreateClusterLinksOptions().validateOnly(false).validateLink(true);
            return admin.createClusterLinks(Collections.singleton(newClusterLink), options);
        }

        void createSourceClusterLink(ConfluentAdmin admin, String linkName, MultiTenantCluster destCluster, int linkUserId) throws Throwable {
            this.createSourceClusterLinkResult(admin, linkName, destCluster, linkUserId).all().get();
        }

        void deleteClusterLink(ConfluentAdmin admin, String linkName) throws Throwable {
            DeleteClusterLinksOptions options = new DeleteClusterLinksOptions();
            admin.deleteClusterLinks(Collections.singleton(linkName), options).all().get();
        }

        void setInternalClusterLinkConfigs(String linkName, Map<String, String> configs) throws Exception {
            String prefixedLinkName = this.user.tenantPrefix() + linkName;
            List<ClusterLinkFactory.ClientManager> clientManagers = this.waitForClientManagers(prefixedLinkName);
            List<Admin> admins = this.waitForAdmins(clientManagers, Collections.emptyList());
            ConfigResource resource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, prefixedLinkName);
            ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>(configs.size());
            for (Map.Entry<String, String> entry : configs.entrySet()) {
                ops.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
            }
            AdminClient adminClient = this.physicalCluster.superAdminClient();
            adminClient.incrementalAlterConfigs(Collections.singletonMap(resource, ops)).all().get();
            for (ClusterLinkFactory.ClientManager clientManager : clientManagers) {
                for (Map.Entry<String, String> e : configs.entrySet()) {
                    MultiTenantClusterLinkTest.waitFor(() -> this.linkConfig(clientManager, prefixedLinkName, (String)e.getKey()), e.getValue(), "Link config not propagated: " + e.getKey());
                }
            }
            this.waitForAdmins(clientManagers, admins);
        }

        void alterClusterLink(ConfluentAdmin admin, String linkName, String configName, String configValue) throws Exception {
            admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, linkName), Collections.singleton(new AlterConfigOp(new ConfigEntry(configName, configValue), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
            MultiTenantClusterLinkTest.waitFor(() -> this.linkConfig(linkName, configName), configValue, "Link config not updated");
        }

        private List<ClusterLinkFactory.ClientManager> waitForClientManagers(String linkName) throws Exception {
            TestUtils.waitForCondition(() -> {
                for (KafkaBroker kafkaBroker : this.physicalCluster.kafkaCluster().kafkaBrokers()) {
                    if (this.clusterLinkClientManager(kafkaBroker, linkName).isPresent()) continue;
                    return false;
                }
                return true;
            }, (String)"Cluster link client managers not created");
            return this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> this.clusterLinkClientManager((KafkaBroker)kafkaBroker, linkName).get()).collect(Collectors.toList());
        }

        private Optional<ClusterLinkFactory.ClientManager> clusterLinkClientManager(KafkaServer server, String linkName) {
            ClusterLinkFactory.LinkManager linkManager = server.clusterLinkManager();
            Option link = linkManager.listClusterLinks().find(l -> l.linkName().equals(linkName));
            if (link.isEmpty()) {
                return Optional.empty();
            }
            return Optional.ofNullable(linkManager.clientManager(((ClusterLinkData)link.get()).linkId()).getOrElse(() -> null));
        }

        private Optional<ClusterLinkFactory.ClientManager> clusterLinkClientManager(KafkaBroker kafkaBroker, String linkName) {
            ClusterLinkFactory.LinkManager linkManager = kafkaBroker.clusterLinkManager();
            Option link = linkManager.listClusterLinks().find(l -> l.linkName().equals(linkName));
            if (link.isEmpty()) {
                return Optional.empty();
            }
            return Optional.ofNullable(linkManager.clientManager(((ClusterLinkData)link.get()).linkId()).getOrElse(() -> null));
        }

        private String linkConfig(ClusterLinkFactory.ClientManager clientManager, String linkName, String configName) {
            return (String)clientManager.currentConfig().originalsStrings().get(configName);
        }

        private List<Admin> waitForAdmins(List<ClusterLinkFactory.ClientManager> clientManagers, List<Admin> oldAdmins) throws Exception {
            ArrayList<Admin> admins = new ArrayList<Admin>();
            TestUtils.waitForCondition(() -> {
                for (ClusterLinkFactory.ClientManager clientManager : clientManagers) {
                    admins.clear();
                    ConfluentAdmin admin = ((ClusterLinkClientManager)clientManager).getAdmin();
                    if (admin == null) {
                        return false;
                    }
                    if (oldAdmins.stream().anyMatch(arg_0 -> MultiTenantCluster.lambda$null$15((Admin)admin, arg_0))) {
                        return false;
                    }
                    admins.add((Admin)admin);
                }
                return true;
            }, (String)"Admin clients not created");
            return admins;
        }

        KafkaProducer<String, String> getOrCreateProducer() {
            if (this.producer == null) {
                this.producer = this.createProducer(this.user, SecurityProtocol.SASL_PLAINTEXT);
            }
            return this.producer;
        }

        KafkaConsumer<String, String> getOrCreateConsumer(String groupId) {
            if (this.consumer == null) {
                this.consumer = this.createConsumer(this.user, groupId, SecurityProtocol.SASL_PLAINTEXT);
            }
            return this.consumer;
        }

        Set<AclBinding> describeAcls(LogicalClusterUser user) {
            try {
                return new HashSet<AclBinding>((Collection)this.admin.describeAcls(this.aclBindingFilter(user)).values().get(15L, TimeUnit.SECONDS));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        void deleteAcls(LogicalClusterUser user) {
            try {
                this.admin.deleteAcls(Collections.singleton(this.aclBindingFilter(user))).all().get(15L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private AclBindingFilter aclBindingFilter(LogicalClusterUser user) {
            AccessControlEntryFilter aceFilter = new AccessControlEntryFilter(user.unprefixedKafkaPrincipal().toString(), null, AclOperation.ANY, AclPermissionType.ANY);
            return new AclBindingFilter(ResourcePatternFilter.ANY, aceFilter);
        }

        private void addClusterAcls(KafkaPrincipal principal, String op) {
            this.physicalCluster.newAclCommand().clusterAclArgs(principal, op).execute();
        }

        private void addLinkAcls(LogicalClusterUser user) throws Exception {
            String principal = user.unprefixedKafkaPrincipal().toString();
            AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
            AclBinding topicConfigAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW));
            AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
            AclBinding groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
            Set acls = Utils.mkSet((Object[])new AclBinding[]{topicAcl, topicConfigAcl, groupAcl, clusterAcl});
            this.admin.createAcls((Collection)acls).all().get(15L, TimeUnit.SECONDS);
        }

        private void addReverseConnectionAcls(LogicalClusterUser user) throws Exception {
            String principal = user.unprefixedKafkaPrincipal().toString();
            AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.ALTER, AclPermissionType.ALLOW));
            Set acls = Utils.mkSet((Object[])new AclBinding[]{clusterAcl});
            this.admin.createAcls((Collection)acls).all().get(15L, TimeUnit.SECONDS);
        }

        int partitionsForTopic(String topic) {
            try {
                TopicDescription topicDesc = (TopicDescription)((KafkaFuture)this.admin.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic)).get(15L, TimeUnit.SECONDS);
                return topicDesc.partitions().size();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        String topicConfig(String topic, String name) {
            try {
                ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
                Config config = (Config)((KafkaFuture)this.admin.describeConfigs(Collections.singleton(resource)).values().get(resource)).get(15L, TimeUnit.SECONDS);
                return config.entries().stream().filter(e -> e.name().equals(name)).findFirst().map(ConfigEntry::value).get();
            }
            catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }

        String linkConfig(String linkName, String configName) {
            ClusterLinkFactory.LinkManager linkManager = this.linkManager();
            UUID linkId = this.linkId(linkName);
            ClusterLinkFactory.ConnectionManager connManager = (ClusterLinkFactory.ConnectionManager)linkManager.connectionManager(linkId).get();
            return (String)connManager.currentConfig().originalsStrings().get(configName);
        }

        ClusterLinkFactory.LinkManager linkManager() {
            return this.physicalCluster.kafkaCluster().kafkaBrokers().get(0).clusterLinkManager();
        }

        UUID linkId(String linkName) {
            ClusterLinkFactory.LinkManager linkManager = this.linkManager();
            return ((ClusterLinkData)linkManager.listClusterLinks().find(d -> d.linkName().equals(this.user.tenantPrefix() + linkName)).get()).linkId();
        }

        boolean linkIdExists(String linkName) {
            ClusterLinkFactory.LinkManager linkManager = this.linkManager();
            return linkManager.listClusterLinks().find(d -> d.linkName().equals(this.user.tenantPrefix() + linkName)).isDefined();
        }

        Map<TopicPartition, OffsetAndMetadata> committedOffsets(String group) {
            try {
                return (Map)this.admin.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        MirrorTopicDescription.State mirrorState(String topic) {
            try {
                MirrorTopicDescription mirror = (MirrorTopicDescription)((KafkaFuture)this.admin.describeMirrors(Collections.singleton(topic), new DescribeMirrorsOptions()).result().get(topic)).get(15L, TimeUnit.SECONDS);
                return mirror.state();
            }
            catch (Exception e) {
                return null;
            }
        }

        private static /* synthetic */ boolean lambda$null$15(Admin admin, Admin oldAdmin) {
            return oldAdmin == admin;
        }
    }
}

