package io.confluent.kafka.link.integration;

import com.google.common.collect.Lists;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.Metric;
import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.FileBasedPlainSaslAuthHostNameValidationIntegrationTest;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.metrics.TenantMetricsTestUtils;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.security.auth.plain.DynamicPlainLoginCallbackHandler;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.ClusterLinkPolicyConfig;
import io.confluent.kafka.server.plugins.policy.CreateClusterLinkPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.security.auth.login.LoginException;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.ClusterLinkQuotas;
import kafka.server.ClusterLinkRequestQuotaManager;
import kafka.server.ClusterLinkTenantContext;
import kafka.server.ClusterLinkTenantRequestQuota;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.LinkRequestQuotaUsageType;
import kafka.server.LinkRequestQuotaUsageType$Fetcher$;
import kafka.server.QuotaType;
import kafka.server.QuotaType$ClusterLinkReplication$;
import kafka.server.QuotaType$ClusterLinkRequest$;
import kafka.server.link.ClusterLinkCCloudToCCloudChannelBuilder;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkDestClientManager;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.FetcherThreadPoolMode$Link$;
import kafka.server.link.MirrorTopicConfigSyncRules;
import kafka.utils.TestInfoUtils;
import kafka.utils.TestUtils;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.ClusterLinkTaskError;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateClusterLinksOptions;
import org.apache.kafka.clients.admin.CreateClusterLinksResult;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DeleteClusterLinksOptions;
import org.apache.kafka.clients.admin.DescribeClusterLinksOptions;
import org.apache.kafka.clients.admin.DescribeMirrorsOptions;
import org.apache.kafka.clients.admin.ListClusterLinksOptions;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.clients.admin.NewMirrorTopic;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.PartitionResult;
import org.apache.kafka.clients.admin.ReplicaStatusOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MirrorTopicError;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerConfig;
import org.apache.kafka.server.interceptor.ConfluentCloudBrokerInterceptor;
import org.apache.kafka.server.link.ClusterLinkMetricsUtils;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.test.TestSslUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Seq;

@Tags({@Tag("integration"), @Tag("bazel:shard_count:6")})
/* loaded from: input_file:io/confluent/kafka/link/integration/MultiTenantClusterLinkTest.class */
public class MultiTenantClusterLinkTest {
    private static final String SYNC_ALL_ACL_FILTER = "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\" }}]}";
    public static final String SSL_KAFKA_CN = "kafka";
    private static final Logger log = LoggerFactory.getLogger(MultiTenantClusterLinkTest.class);
    private MultiTenantCluster sourceCluster;
    private MultiTenantCluster destCluster;
    private TestInfo testInfo;
    private static final String DEVELOPER1 = "app1-developer";
    private static final String APP1_TOPIC = "app1-topic";
    private static final String APP1_CONSUMER_GROUP = "app1-consumer-group";
    private final String sourceLogicalCluster = "lkc-source";
    private final String destLogicalCluster = "lkc-dest";
    private final String linkName = "tenantLink";
    private final String topic = "linkedTopic";
    private int numPartitions = 2;
    private int nextMessageIndex = 0;
    private final String localAclsFilter = SYNC_ALL_ACL_FILTER;
    private final String subsetAclsFilter = "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"topic\", \"name\": \"app1-topic\", \"patternType\": \"literal\" },\"accessFilter\": { \"principal\": \"User:app1-developer\", \"operation\": \"write\", \"host\": \"*\", \"permissionType\": \"allow\", \"clusterLinkIds\" : [\"" + Uuid.ZERO_UUID + "\"]  }}]}";
    private final KafkaPrincipal principal = new KafkaPrincipal("User", DEVELOPER1);
    private final Collection<AclBinding> allExpectedAcls = Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, APP1_TOPIC, PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TOPIC, APP1_TOPIC, PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, APP1_CONSUMER_GROUP, PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)));
    private final Collection<AclBinding> localExpectedAcls = Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, APP1_TOPIC, PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, APP1_CONSUMER_GROUP, PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)));
    private final Collection<AclBinding> subsetExpectedAcls = Collections.singletonList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, APP1_TOPIC, PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.kafka.link.integration.MultiTenantClusterLinkTest$3, reason: invalid class name */
    /* loaded from: input_file:io/confluent/kafka/link/integration/MultiTenantClusterLinkTest$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$clients$admin$ClusterLinkDescription$LinkMode = new int[ClusterLinkDescription.LinkMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$clients$admin$ClusterLinkDescription$LinkMode[ClusterLinkDescription.LinkMode.DESTINATION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$clients$admin$ClusterLinkDescription$LinkMode[ClusterLinkDescription.LinkMode.SOURCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafka/link/integration/MultiTenantClusterLinkTest$DummyOAuthLoginModule.class */
    public static class DummyOAuthLoginModule extends OAuthBearerLoginModule {
        public boolean login() throws LoginException {
            return true;
        }

        public boolean logout() {
            return true;
        }

        public boolean commit() {
            return true;
        }

        public boolean abort() {
            return true;
        }
    }

    /* loaded from: input_file:io/confluent/kafka/link/integration/MultiTenantClusterLinkTest$MultiTenantCluster.class */
    public static class MultiTenantCluster extends IntegrationTestHarness {
        private final Map<String, String> sslConfigs;
        private PhysicalCluster physicalCluster;
        private LogicalCluster logicalCluster;
        private LogicalClusterUser user;
        private LogicalClusterUser linkUser;
        private ConfluentAdmin admin;
        private boolean useSourceInitiatedLink;
        KafkaProducer<String, String> producer;
        KafkaConsumer<String, String> consumer;

        public MultiTenantCluster(TestInfo testInfo) {
            super(testInfo);
            this.sslConfigs = new HashMap();
        }

        void startCluster(Properties properties, String str, int i, boolean z) throws Exception {
            startCluster(properties, properties, str, i, z, Optional.empty());
        }

        void startCluster(Properties properties, Properties properties2, String str, int i, boolean z, Optional<String> optional) throws Exception {
            startCluster(properties, properties2, str, i, z, optional, false, MultiTenantRequestContextTest.LOCALHOST, "required");
        }

        void startCluster(Properties properties, Properties properties2, String str, int i, boolean z, Optional<String> optional, boolean z2, String str2, String str3) throws Exception {
            if (z || z2) {
                Properties properties3 = new Properties();
                createSslStores(str2, str3);
                properties3.putAll(this.sslConfigs);
                if (z) {
                    properties3.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:SSL,EXTERNAL:SASL_PLAINTEXT");
                }
                properties3.putAll(properties);
                this.physicalCluster = start(properties3, properties2, true, Collections.singleton(String.format("User:O=A server,CN=%s", MultiTenantClusterLinkTest.SSL_KAFKA_CN)), Optional.of(Time.SYSTEM), physicalCluster -> {
                });
            } else {
                this.physicalCluster = start(properties, properties2, true, Optional.of(Time.SYSTEM), physicalCluster2 -> {
                });
            }
            this.logicalCluster = this.physicalCluster.createLogicalCluster(str, 100, Integer.valueOf(i));
            this.user = this.logicalCluster.user(i);
            if (!z) {
                if (optional.isPresent()) {
                    this.admin = super.createAdminClient(this.logicalCluster.adminUser(), optional.get());
                    return;
                } else {
                    this.admin = super.createAdminClient(this.logicalCluster.adminUser());
                    return;
                }
            }
            Properties properties4 = new Properties();
            for (Map.Entry<String, String> entry : clientConfigs("INTERNAL").entrySet()) {
                properties4.put(entry.getKey(), entry.getValue());
            }
            this.admin = super.createAdminClient(this.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.SSL, null, null, properties4);
        }

        Admin internalAdminClient() {
            return this.physicalCluster.superAdminClient();
        }

        ConfluentAdmin createConfluentAdmin(LogicalCluster logicalCluster) {
            return super.createAdminClient(logicalCluster.adminUser());
        }

        LogicalClusterUser createLinkUser(int i) throws Exception {
            LogicalClusterUser addUser = this.logicalCluster.addUser(this.physicalCluster.getOrCreateUser(i, false));
            Set<AclBinding> linkAcls = linkAcls(addUser);
            if (this.useSourceInitiatedLink) {
                linkAcls.addAll(reverseConnectionAclsForLinkUser(addUser));
            }
            addLinkAcls(linkAcls);
            waitForAclsToExistOnAllBrokers(this.physicalCluster, linkAcls, addUser.tenantPrefix());
            return addUser;
        }

        private static void waitForAclsToExistOnAllBrokers(PhysicalCluster physicalCluster, Set<AclBinding> set, String str) {
            TestUtils.waitUntilTrue(() -> {
                return Boolean.valueOf(physicalCluster.kafkaCluster().kafkaBrokers().stream().allMatch(kafkaBroker -> {
                    if (kafkaBroker.authorizer().isEmpty()) {
                        return false;
                    }
                    return removeTenantPrefix(Lists.newArrayList(((Authorizer) kafkaBroker.authorizer().get()).acls(AclBindingFilter.ANY)), str).containsAll(set);
                }));
            }, () -> {
                return "Failed to validate all ACLs exist on all brokers";
            }, 15000L, 100L);
        }

        private static Collection<AclBinding> removeTenantPrefix(Collection<AclBinding> collection, String str) {
            return (Collection) collection.stream().map(aclBinding -> {
                AccessControlEntry entry = aclBinding.entry();
                if (!entry.principal().contains(str)) {
                    return new AclBinding(aclBinding.pattern(), aclBinding.entry());
                }
                String replaceFirst = entry.principal().replaceFirst(str, "").replaceFirst("TenantUser", "User");
                ResourcePattern pattern = aclBinding.pattern();
                return new AclBinding((pattern.name().equals(str) && pattern.patternType().equals(PatternType.PREFIXED)) ? new ResourcePattern(pattern.resourceType(), "*", PatternType.LITERAL) : new ResourcePattern(pattern.resourceType(), pattern.name().replaceFirst(str, ""), pattern.patternType()), new AccessControlEntry(replaceFirst, entry.host(), entry.operation(), entry.permissionType(), entry.clusterLinkIds()));
            }).collect(Collectors.toList());
        }

        private void createSslStores(String str, String str2) throws Exception {
            CertStores build = new CertStores.Builder(true).cn(MultiTenantClusterLinkTest.SSL_KAFKA_CN).addHostName(str).build();
            Properties properties = new Properties();
            BiConsumer biConsumer = (str3, obj) -> {
                if (obj instanceof Password) {
                    properties.setProperty(str3, ((Password) obj).value());
                } else if (obj instanceof List) {
                    properties.setProperty(str3, String.join(",", (List) obj));
                } else if (obj != null) {
                    properties.setProperty(str3, (String) obj);
                }
            };
            build.keyStoreProps().forEach(biConsumer);
            build.trustStoreProps().forEach(biConsumer);
            TestSslUtils.convertToPemWithoutFiles(properties);
            properties.forEach((obj2, obj3) -> {
                this.sslConfigs.put((String) obj2, (String) obj3);
            });
            this.sslConfigs.put("ssl.client.auth", str2);
        }

        private boolean usesSslForInternalListener() {
            return !this.sslConfigs.isEmpty();
        }

        void deleteUser(LogicalClusterUser logicalClusterUser, boolean z) {
            this.logicalCluster.removeUser(logicalClusterUser.userMetadata.userId());
            if (z) {
                deleteAcls(logicalClusterUser);
            }
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
            }
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<String, String> clientConfigs(String str) {
            return clientConfigs(str, true);
        }

        private Map<String, String> clientConfigs(String str, boolean z) {
            Properties securityProps = str.equals("EXTERNAL") ? KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(str), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig()) : usesSslForInternalListener() ? KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(str), SecurityProtocol.SSL, null, null) : KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(str), SecurityProtocol.PLAINTEXT, null, "");
            HashMap hashMap = new HashMap();
            Properties properties = securityProps;
            securityProps.stringPropertyNames().forEach(str2 -> {
            });
            if (z) {
                hashMap.putAll(this.sslConfigs);
            }
            return hashMap;
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, String str2, Map<String, String> map, boolean z) throws Throwable {
            return createDestClusterLinkResult(confluentAdmin, str, multiTenantCluster, i, str2, map, z, true);
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, String str2, Map<String, String> map, boolean z, boolean z2) throws Throwable {
            return z2 ? createDestClusterLinkResult(confluentAdmin, str, multiTenantCluster, i, str2, map, z, MultiTenantClusterLinkTest.SYNC_ALL_ACL_FILTER) : createDestClusterLinkResult(confluentAdmin, str, multiTenantCluster, i, str2, map, z, "");
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, String str2, Map<String, String> map, boolean z, String str3) throws Throwable {
            return createDestClusterLinkResult(confluentAdmin, str, multiTenantCluster, i, str2, true, map, z, str3, null);
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, String str2, boolean z, Map<String, String> map, boolean z2, String str3, Uuid uuid) throws Throwable {
            HashMap hashMap = new HashMap();
            if (this.useSourceInitiatedLink) {
                hashMap.put(ClusterLinkConfig.LinkModeProp(), "DESTINATION");
                hashMap.put(ClusterLinkConfig.ConnectionModeProp(), "INBOUND");
            } else {
                multiTenantCluster.linkUser = multiTenantCluster.createLinkUser(i);
                if (z) {
                    hashMap.putAll(multiTenantCluster.clientConfigs(str2));
                } else {
                    hashMap.putAll(multiTenantCluster.clientConfigs(str2, false));
                }
            }
            hashMap.put("request.timeout.ms", "10000");
            hashMap.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            if (!str3.isEmpty()) {
                hashMap.put(ClusterLinkConfig.AclFiltersProp(), str3);
                hashMap.put(ClusterLinkConfig.AclSyncEnableProp(), "true");
                hashMap.put(ClusterLinkConfig.AclSyncMsProp(), "2000");
            }
            hashMap.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), "{ \"groupFilters\": [{ \"name\": \"*\", \"patternType\": \"literal\", \"filterType\": \"include\" }]}");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "2000");
            String logicalClusterId = str2.equals("EXTERNAL") ? multiTenantCluster.logicalCluster.logicalClusterId() : multiTenantCluster.physicalCluster.kafkaCluster().kafkaBrokers().get(0).clusterId();
            hashMap.putAll(map);
            return confluentAdmin.createClusterLinks(Collections.singleton(new NewClusterLink(str, logicalClusterId, hashMap, uuid)), new CreateClusterLinksOptions().validateOnly(false).validateLink(z2));
        }

        void createDestClusterLink(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, Map<String, String> map) throws Throwable {
            createDestClusterLink(confluentAdmin, str, multiTenantCluster, i, map, true);
        }

        void createDestClusterLink(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, Map<String, String> map, Boolean bool) throws Throwable {
            createDestClusterLinkResult(confluentAdmin, str, multiTenantCluster, i, "EXTERNAL", map, true, bool.booleanValue()).all().get();
            setInternalClusterLinkConfigs(str, Collections.singletonMap("metadata.max.age.ms", "1000"));
        }

        CreateClusterLinksResult createSourceClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, Map<String, String> map) throws Throwable {
            return createSourceClusterLinkResult(confluentAdmin, str, multiTenantCluster, i, map, null, SecurityProtocol.SASL_PLAINTEXT);
        }

        CreateClusterLinksResult createSourceClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, Map<String, String> map, String str2, SecurityProtocol securityProtocol) throws Throwable {
            Assertions.assertTrue(this.useSourceInitiatedLink);
            multiTenantCluster.linkUser = multiTenantCluster.createLinkUser(i + 1000);
            this.linkUser = createLinkUser(i);
            Properties securityProps = KafkaTestUtils.securityProps(multiTenantCluster.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), multiTenantCluster.linkUser.saslJaasConfig());
            Properties securityProps2 = KafkaTestUtils.securityProps(str2 == null ? this.physicalCluster.bootstrapServers() : this.physicalCluster.bootstrapServers(str2), securityProtocol, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig());
            securityProps2.put("ssl.endpoint.identification.algorithm", "https");
            HashMap hashMap = new HashMap();
            hashMap.put(ClusterLinkConfig.LinkModeProp(), "SOURCE");
            hashMap.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
            securityProps.stringPropertyNames().forEach(str3 -> {
            });
            securityProps2.stringPropertyNames().forEach(str4 -> {
            });
            hashMap.put("request.timeout.ms", "10000");
            hashMap.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            hashMap.putAll(map);
            return confluentAdmin.createClusterLinks(Collections.singleton(new NewClusterLink(str, multiTenantCluster.logicalCluster.logicalClusterId(), hashMap)), new CreateClusterLinksOptions().validateOnly(false).validateLink(true));
        }

        void createSourceClusterLink(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i) throws Throwable {
            createSourceClusterLinkResult(confluentAdmin, str, multiTenantCluster, i, Collections.emptyMap()).all().get();
        }

        void deleteClusterLink(ConfluentAdmin confluentAdmin, String str) throws Throwable {
            confluentAdmin.deleteClusterLinks(Collections.singleton(str), new DeleteClusterLinksOptions()).all().get();
        }

        void setInternalClusterLinkConfigs(String str, Map<String, String> map) throws Exception {
            String str2 = this.user.tenantPrefix() + str;
            List<ClusterLinkFactory.ClientManager> waitForClientManagers = waitForClientManagers(str2);
            List<Admin> waitForAdmins = waitForAdmins(waitForClientManagers, Collections.emptyList());
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, str2);
            ArrayList arrayList = new ArrayList(map.size());
            for (Map.Entry<String, String> entry : map.entrySet()) {
                arrayList.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
            }
            this.physicalCluster.superAdminClient().incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList)).all().get();
            for (ClusterLinkFactory.ClientManager clientManager : waitForClientManagers) {
                for (Map.Entry<String, String> entry2 : map.entrySet()) {
                    MultiTenantClusterLinkTest.waitFor(() -> {
                        return linkConfig(clientManager, str2, (String) entry2.getKey());
                    }, entry2.getValue(), "Link config not propagated: " + entry2.getKey());
                }
            }
            waitForAdmins(waitForClientManagers, waitForAdmins);
        }

        void alterClusterLink(ConfluentAdmin confluentAdmin, String str, String str2, String str3) throws Exception {
            alterClusterLink(confluentAdmin, str, str2, str3, true);
        }

        void alterClusterLink(ConfluentAdmin confluentAdmin, String str, String str2, String str3, boolean z) throws Exception {
            confluentAdmin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, str), Collections.singleton(new AlterConfigOp(new ConfigEntry(str2, str3), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
            if (z) {
                MultiTenantClusterLinkTest.waitFor(() -> {
                    return linkConfig(str, str2);
                }, str3, "Link config not updated");
            }
        }

        private List<ClusterLinkFactory.ClientManager> waitForClientManagers(String str) throws Exception {
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                Iterator<KafkaBroker> it = this.physicalCluster.kafkaCluster().kafkaBrokers().iterator();
                while (it.hasNext()) {
                    if (!clusterLinkClientManager(it.next(), str).isPresent()) {
                        return false;
                    }
                }
                return true;
            }, "Cluster link client managers not created");
            return (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> {
                return clusterLinkClientManager(kafkaBroker, str).get();
            }).collect(Collectors.toList());
        }

        private Optional<ClusterLinkFactory.ClientManager> clusterLinkClientManager(KafkaServer kafkaServer, String str) {
            ClusterLinkFactory.LinkManager clusterLinkManager = kafkaServer.clusterLinkManager();
            Option find = clusterLinkManager.listClusterLinks().find(clusterLinkData -> {
                return Boolean.valueOf(clusterLinkData.linkName().equals(str));
            });
            return find.isEmpty() ? Optional.empty() : Optional.ofNullable(clusterLinkManager.clientManager(((ClusterLinkData) find.get()).linkId()).getOrElse(() -> {
                return null;
            }));
        }

        private Optional<ClusterLinkFactory.ClientManager> clusterLinkClientManager(KafkaBroker kafkaBroker, String str) {
            ClusterLinkFactory.LinkManager clusterLinkManager = kafkaBroker.clusterLinkManager();
            Option find = clusterLinkManager.listClusterLinks().find(clusterLinkData -> {
                return Boolean.valueOf(clusterLinkData.linkName().equals(str));
            });
            return find.isEmpty() ? Optional.empty() : Optional.ofNullable(clusterLinkManager.clientManager(((ClusterLinkData) find.get()).linkId()).getOrElse(() -> {
                return null;
            }));
        }

        private String linkConfig(ClusterLinkFactory.ClientManager clientManager, String str, String str2) {
            return (String) clientManager.currentConfig().originalsStrings().get(str2);
        }

        private List<Admin> waitForAdmins(List<ClusterLinkFactory.ClientManager> list, List<Admin> list2) throws Exception {
            ArrayList arrayList = new ArrayList();
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ClusterLinkDestClientManager clusterLinkDestClientManager = (ClusterLinkFactory.ClientManager) it.next();
                    arrayList.clear();
                    ConfluentAdmin admin = clusterLinkDestClientManager.getAdmin();
                    if (admin == null || list2.stream().anyMatch(admin2 -> {
                        return admin2 == admin;
                    })) {
                        return false;
                    }
                    arrayList.add(admin);
                }
                return true;
            }, "Admin clients not created");
            return arrayList;
        }

        KafkaProducer<String, String> getOrCreateProducer() {
            return getOrCreateProducer(Optional.empty());
        }

        KafkaProducer<String, String> getOrCreateProducer(Optional<String> optional) {
            if (this.producer == null) {
                this.producer = createProducer(this.user, SecurityProtocol.SASL_PLAINTEXT, optional);
            }
            return this.producer;
        }

        KafkaConsumer<String, String> getOrCreateConsumer(String str) {
            return getOrCreateConsumer(str, Optional.empty());
        }

        KafkaConsumer<String, String> getOrCreateConsumer(String str, Optional<String> optional) {
            if (this.consumer == null) {
                this.consumer = createConsumer(this.user, str, SecurityProtocol.SASL_PLAINTEXT, optional, new Properties());
            }
            return this.consumer;
        }

        Set<AclBinding> describeAcls(LogicalClusterUser logicalClusterUser) {
            try {
                return new HashSet((Collection) this.admin.describeAcls(aclBindingFilter(logicalClusterUser)).values().get(15L, TimeUnit.SECONDS));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        void deleteAcls(LogicalClusterUser logicalClusterUser) {
            try {
                this.admin.deleteAcls(Collections.singleton(aclBindingFilter(logicalClusterUser))).all().get(15L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private AclBindingFilter aclBindingFilter(LogicalClusterUser logicalClusterUser) {
            return new AclBindingFilter(ResourcePatternFilter.ANY, new AccessControlEntryFilter(logicalClusterUser.unprefixedKafkaPrincipal().toString(), (String) null, AclOperation.ANY, AclPermissionType.ANY));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addClusterAcls(KafkaPrincipal kafkaPrincipal, String str) {
            this.physicalCluster.newAclCommand().clusterAclArgs(kafkaPrincipal, str).execute();
        }

        private void addLinkAcls(Set<AclBinding> set) throws Exception {
            this.admin.createAcls(set).all().get(15L, TimeUnit.SECONDS);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void deleteLinkAcls(LogicalClusterUser logicalClusterUser) throws Exception {
            this.admin.deleteAcls((Collection) linkAcls(logicalClusterUser).stream().map((v0) -> {
                return v0.toFilter();
            }).collect(Collectors.toList())).all().get(15L, TimeUnit.SECONDS);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Set<AclBinding> linkAcls(LogicalClusterUser logicalClusterUser) {
            String kafkaPrincipal = logicalClusterUser.unprefixedKafkaPrincipal().toString();
            return Utils.mkSet(new AclBinding[]{new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW))});
        }

        private Set<AclBinding> reverseConnectionAclsForLinkUser(LogicalClusterUser logicalClusterUser) throws Exception {
            return Utils.mkSet(new AclBinding[]{new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(logicalClusterUser.unprefixedKafkaPrincipal().toString(), "*", AclOperation.ALTER, AclPermissionType.ALLOW))});
        }

        int partitionsForTopic(String str) {
            try {
                return ((TopicDescription) ((KafkaFuture) this.admin.describeTopics(Collections.singleton(str)).topicNameValues().get(str)).get(15L, TimeUnit.SECONDS)).partitions().size();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        String topicConfig(String str, String str2) {
            try {
                ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
                return (String) ((Config) ((KafkaFuture) this.admin.describeConfigs(Collections.singleton(configResource)).values().get(configResource)).get(15L, TimeUnit.SECONDS)).entries().stream().filter(configEntry -> {
                    return configEntry.name().equals(str2);
                }).findFirst().map((v0) -> {
                    return v0.value();
                }).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        String linkConfig(String str, String str2) {
            return (String) ((ClusterLinkFactory.ConnectionManager) linkManager().connectionManager(linkId(str)).get()).currentConfig().originalsStrings().get(str2);
        }

        ClusterLinkFactory.LinkManager linkManager() {
            return this.physicalCluster.kafkaCluster().kafkaBrokers().get(0).clusterLinkManager();
        }

        Uuid linkId(String str) {
            return ((ClusterLinkData) linkManager().listClusterLinks().find(clusterLinkData -> {
                return Boolean.valueOf(clusterLinkData.linkName().equals(this.user.tenantPrefix() + str));
            }).get()).linkId();
        }

        boolean linkIdExists(String str) {
            return linkManager().listClusterLinks().find(clusterLinkData -> {
                return Boolean.valueOf(clusterLinkData.linkName().equals(this.user.tenantPrefix() + str));
            }).isDefined();
        }

        ClusterLinkDescription.LinkState linkState(String str) {
            try {
                for (ClusterLinkListing clusterLinkListing : (Collection) this.admin.listClusterLinks(new ListClusterLinksOptions().includeTopics(false)).result().get()) {
                    if (clusterLinkListing.linkName().equals(str)) {
                        return clusterLinkListing.linkState();
                    }
                }
                return ClusterLinkDescription.LinkState.UNKNOWN;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        Map<TopicPartition, OffsetAndMetadata> committedOffsets(String str) {
            try {
                return (Map) this.admin.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        MirrorTopicDescription mirrorDescription(String str, boolean z) {
            try {
                return (MirrorTopicDescription) ((KafkaFuture) this.admin.describeMirrors(Collections.singleton(str), new DescribeMirrorsOptions().includeStateTransitionErrors(z)).result().get(str)).get(15L, TimeUnit.SECONDS);
            } catch (Exception e) {
                return null;
            }
        }
    }

    public static Stream allCombinations() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"zk", "true"}), Arguments.of(new Object[]{"zk", "false"}), Arguments.of(new Object[]{"kraft", "true"})});
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.sourceCluster = new MultiTenantCluster(testInfo);
        this.destCluster = new MultiTenantCluster(testInfo);
        this.testInfo = testInfo;
    }

    @AfterEach
    public void tearDown() throws Exception {
        try {
            this.sourceCluster.shutdown();
            this.destCluster.shutdown();
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
        } catch (Throwable th) {
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
            throw th;
        }
    }

    private void setUpClusters(boolean z, boolean z2, boolean z3) throws Exception {
        setUpClusters(z, z2, z3, 1000);
    }

    private void setUpClusters(boolean z, boolean z2, boolean z3, int i) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.max.acls.per.tenant", String.valueOf(i));
        setUpClusters(z, z2, z3, Collections.emptyMap(), hashMap);
    }

    private void setUpClusters(boolean z, boolean z2, boolean z3, Map<String, String> map, Map<String, String> map2) throws Exception {
        setupSourceCluster(z, z2, z3, map, Optional.empty());
        setupDestCluster(z, map2);
    }

    private void setupSourceCluster(boolean z, boolean z2, boolean z3, Map<String, String> map, Optional<String> optional) throws Exception {
        setupSourceCluster(z, z2, z3, map, map, optional);
    }

    private void setupSourceCluster(boolean z, boolean z2, boolean z3, Map<String, String> map, Map<String, String> map2, Optional<String> optional) throws Exception {
        setupSourceCluster(z, z2, z3, map, map2, optional, false, MultiTenantRequestContextTest.LOCALHOST, "required");
    }

    private void setupSourceCluster(boolean z, boolean z2, boolean z3, Map<String, String> map, Map<String, String> map2, Optional<String> optional, boolean z4, String str, String str2) throws Exception {
        this.sourceCluster.useSourceInitiatedLink = z;
        Properties brokerProps = brokerProps(z3);
        brokerProps.putAll(map);
        Properties brokerProps2 = brokerProps(z3);
        brokerProps2.putAll(map2);
        this.sourceCluster.startCluster(brokerProps, brokerProps2, "lkc-source", 1, z2, optional, z4, str, str2);
        addAcls(this.sourceCluster.admin, this.sourceCluster.user, new String[0]);
    }

    private void setupDestCluster(boolean z, Map<String, String> map) throws Exception {
        this.destCluster.useSourceInitiatedLink = z;
        Properties brokerProps = brokerProps();
        brokerProps.putAll(map);
        this.destCluster.startCluster(brokerProps, "lkc-dest", 11, false);
        addAcls(this.destCluster.admin, this.destCluster.user, new String[0]);
    }

    private boolean testRunsWithLinkCoordinator() {
        return this.testInfo.getDisplayName().contains("coordinator=true");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkAddSourcePartitionsAndChangeSourceConfig(String str, boolean z) throws Throwable {
        testBasicEndToEndClusterLinking("", false, true, true, false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkSyncAcls(String str, boolean z) throws Throwable {
        testBasicEndToEndClusterLinking("", true, false, false, false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkMetricReduction(String str, boolean z) throws Throwable {
        testBasicEndToEndClusterLinking("", false, false, false, true);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkMirrorStateTransitionErrors(String str, boolean z) throws Throwable {
        setUpClusters(false, false, false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap());
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.AvailabilityCheckMsProp(), "100");
        hashMap.put(ClusterLinkConfig.AvailabilityCheckConsecutiveFailureThresholdProp(), "1");
        hashMap.put("request.timeout.ms", "1000");
        hashMap.put("default.api.timeout.ms", "1000");
        this.destCluster.setInternalClusterLinkConfigs("tenantLink", hashMap);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        createSourceTopic();
        createMirrorTopicWaitForSuccess(this.destCluster.admin);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return this.destCluster.mirrorDescription("linkedTopic", true).state() == MirrorTopicDescription.State.ACTIVE;
        }, "Mirror topic not active");
        this.sourceCluster.shutdownBrokers();
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return this.destCluster.mirrorDescription("linkedTopic", true).state() == MirrorTopicDescription.State.SOURCE_UNAVAILABLE;
        }, "Mirror topic not source unavailable");
        this.destCluster.admin.alterMirrors(Collections.singletonMap("linkedTopic", AlterMirrorOp.PROMOTE), new AlterMirrorsOptions()).all().get(15L, TimeUnit.SECONDS);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            List mirrorStateTransitionErrors = this.destCluster.mirrorDescription("linkedTopic", true).mirrorStateTransitionErrors();
            return mirrorStateTransitionErrors.size() == 1 && ((ClusterLinkTaskError) mirrorStateTransitionErrors.get(0)).equals(new ClusterLinkTaskError(ClusterLinkTaskError.ClusterLinkTaskErrorCode.INTERNAL_ERROR, "Failed to describe topic configs for an unknown reason."));
        }, "Mirror topic is an error state");
        stopMirroring(this.destCluster, "linkedTopic");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            MirrorTopicDescription mirrorDescription = this.destCluster.mirrorDescription("linkedTopic", true);
            return mirrorDescription.state() == MirrorTopicDescription.State.STOPPED && mirrorDescription.mirrorStateTransitionErrors().isEmpty();
        }, "Mirror topic error state not cleared");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMaxMessageSize(String str, boolean z) throws Throwable {
        setupSourceCluster(false, false, false, Collections.emptyMap(), Optional.empty());
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.plugins.topic.policy.max.message.bytes.max", String.valueOf(1000));
        hashMap.put(KafkaConfig.CreateClusterLinkPolicyClassNameProp(), CreateClusterLinkPolicy.class.getName());
        setupDestCluster(false, hashMap);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), false);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        this.numPartitions = 1;
        Properties properties = new Properties();
        properties.setProperty("max.message.bytes", String.valueOf(100000));
        this.sourceCluster.physicalCluster.kafkaCluster().createTopic(this.sourceCluster.user.tenantPrefix() + "linkedTopic", this.numPartitions, 1, properties);
        Optional<String> empty = Optional.empty();
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(empty), "linkedTopic", this.nextMessageIndex, 10);
        this.nextMessageIndex += 10;
        Assertions.assertEquals("100000", this.sourceCluster.topicConfig("linkedTopic", "max.message.bytes"));
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createMirrorTopic(this.destCluster.admin, "");
        })).getCause().getClass());
        this.sourceCluster.producer.send(new ProducerRecord("linkedTopic", String.valueOf(this.nextMessageIndex), org.apache.kafka.test.TestUtils.randomString(1000 + 100))).get(15L, TimeUnit.SECONDS);
        this.nextMessageIndex++;
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(empty), "linkedTopic", this.nextMessageIndex, 10);
        this.nextMessageIndex += 10;
        this.sourceCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "linkedTopic"), Collections.singleton(new AlterConfigOp(new ConfigEntry("max.message.bytes", "1000"), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(empty), "linkedTopic", this.nextMessageIndex, 10);
        this.nextMessageIndex += 10;
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return this.destCluster.mirrorDescription("linkedTopic", false).mirrorTopicError() == MirrorTopicError.RECORD_TOO_LARGE;
        }, "Mirror topic not failed");
        Assertions.assertEquals(ClusterAuthorizationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        ConfluentAdmin internalAdminClient = this.destCluster.internalAdminClient();
        internalAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "lkc-dest_tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.MaxMessageSizeProp(), "10000"), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        internalAdminClient.alterMirrors(Collections.singletonMap("lkc-dest_linkedTopic", AlterMirrorOp.REPAIR), new AlterMirrorsOptions()).all().get(15L, TimeUnit.SECONDS);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return this.destCluster.mirrorDescription("linkedTopic", false).state() == MirrorTopicDescription.State.ACTIVE;
        }, "Mirror topic not repaired");
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(empty), "linkedTopic", this.nextMessageIndex, 10);
        this.nextMessageIndex += 10;
        waitForMirror();
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testBidirectionalClusterLink(String str, boolean z) throws Throwable {
        setUpClusters(false, false, false);
        addAcls(this.destCluster.admin, this.destCluster.user, "west_");
        addAcls(this.sourceCluster.admin, this.sourceCluster.user, "east_");
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "west_");
        hashMap.put(ClusterLinkConfig.LinkModeProp(), ClusterLinkDescription.LinkMode.BIDIRECTIONAL.name());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, hashMap, false);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "First link was not created");
        Uuid linkId = this.destCluster.linkId("tenantLink");
        hashMap.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "east_");
        verifyMismatchedLinkIdFailure(hashMap);
        this.sourceCluster.createDestClusterLinkResult(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003, "EXTERNAL", false, hashMap, true, "", linkId);
        waitFor(() -> {
            return Boolean.valueOf(this.sourceCluster.linkIdExists("tenantLink"));
        }, true, "Second link was not created");
        Assertions.assertEquals(linkId, this.sourceCluster.linkId("tenantLink"));
        createTopic(this.sourceCluster);
        createTopic(this.destCluster);
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "west_");
        createMirrorTopicWaitForSuccess(this.sourceCluster.admin, "east_");
        verifyTopicListing("west_", this.destCluster);
        verifyTopicListing("east_", this.sourceCluster);
        verifyTopicMirroring("west_", this.sourceCluster, this.destCluster);
        verifyTopicMirroring("east_", this.destCluster, this.sourceCluster);
        stopMirroring(this.destCluster, "west_linkedTopic");
        Assertions.assertEquals(0, this.destCluster.mirrorDescription("west_linkedTopic", true).mirrorStateTransitionErrors().size());
        stopMirroring(this.sourceCluster, "east_linkedTopic");
        Assertions.assertEquals(0, this.sourceCluster.mirrorDescription("east_linkedTopic", true).mirrorStateTransitionErrors().size());
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "tenantLink");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testReverseAndSwap(String str, boolean z) throws Throwable {
        setUpClusters(false, false, false);
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.LinkModeProp(), ClusterLinkDescription.LinkMode.BIDIRECTIONAL.name());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, hashMap, false);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "First link was not created");
        Uuid linkId = this.destCluster.linkId("tenantLink");
        verifyMismatchedLinkIdFailure(hashMap);
        this.sourceCluster.createDestClusterLinkResult(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003, "EXTERNAL", false, hashMap, true, "", linkId);
        waitFor(() -> {
            return Boolean.valueOf(this.sourceCluster.linkIdExists("tenantLink"));
        }, true, "Second link was not created");
        Assertions.assertEquals(linkId, this.sourceCluster.linkId("tenantLink"));
        addAcls(this.sourceCluster.admin, this.sourceCluster.linkUser, new String[0]);
        addAcls(this.destCluster.admin, this.destCluster.linkUser, new String[0]);
        String kafkaPrincipal = this.sourceCluster.linkUser.unprefixedKafkaPrincipal().toString();
        String kafkaPrincipal2 = this.destCluster.linkUser.unprefixedKafkaPrincipal().toString();
        AclBinding aclBinding = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALTER, AclPermissionType.ALLOW));
        AclBinding aclBinding2 = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal2, "*", AclOperation.ALTER, AclPermissionType.ALLOW));
        this.sourceCluster.admin.createAcls(Collections.singleton(aclBinding)).all().get(15L, TimeUnit.SECONDS);
        this.destCluster.admin.createAcls(Collections.singleton(aclBinding2)).all().get(15L, TimeUnit.SECONDS);
        createTopic(this.sourceCluster);
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        verifyTopicMirroring("", this.sourceCluster, this.destCluster);
        reverseAndSwap(this.destCluster, "linkedTopic", true);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return this.destCluster.mirrorDescription("linkedTopic", true).state() == MirrorTopicDescription.State.STOPPED;
        }, "Local mirror not stopped");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return this.sourceCluster.mirrorDescription("linkedTopic", true).state() == MirrorTopicDescription.State.ACTIVE;
        }, "Remote mirror not started");
        reverseAndSwap(this.sourceCluster, "linkedTopic", false);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return this.destCluster.mirrorDescription("linkedTopic", true).state() == MirrorTopicDescription.State.PAUSED;
        }, "Local mirror not paused");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return this.sourceCluster.mirrorDescription("linkedTopic", true).state() == MirrorTopicDescription.State.STOPPED;
        }, "Remote mirror not stopped");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testReverseAndSwapUnsupportedWithPrefixedLinks(String str, boolean z) throws Throwable {
        setUpClusters(false, false, false);
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.LinkModeProp(), ClusterLinkDescription.LinkMode.BIDIRECTIONAL.name());
        hashMap.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, hashMap, false);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "First link was not created");
        Uuid linkId = this.destCluster.linkId("tenantLink");
        verifyMismatchedLinkIdFailure(hashMap);
        this.sourceCluster.createDestClusterLinkResult(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003, "EXTERNAL", false, hashMap, true, "", linkId);
        waitFor(() -> {
            return Boolean.valueOf(this.sourceCluster.linkIdExists("tenantLink"));
        }, true, "Second link was not created");
        Assertions.assertEquals(linkId, this.sourceCluster.linkId("tenantLink"));
        addAcls(this.sourceCluster.admin, this.sourceCluster.linkUser, new String[0]);
        addAcls(this.destCluster.admin, this.destCluster.linkUser, new String[0]);
        createTopic(this.sourceCluster);
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "src_");
        verifyTopicMirroring("src_", this.sourceCluster, this.destCluster);
        try {
            reverseAndSwap(this.destCluster, "src_linkedTopic", true);
        } catch (Throwable th) {
            Assertions.assertEquals(InvalidRequestException.class, th.getCause().getClass());
        }
        try {
            reverseAndSwap(this.destCluster, "src_linkedTopic", false);
        } catch (Throwable th2) {
            Assertions.assertEquals(InvalidRequestException.class, th2.getCause().getClass());
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkSyncAclsSemantics(String str, boolean z) throws Throwable {
        this.sourceCluster.useSourceInitiatedLink = false;
        this.destCluster.useSourceInitiatedLink = false;
        this.sourceCluster.startCluster(brokerProps(), "lkc-source", 1, false);
        Properties brokerProps = brokerProps();
        brokerProps.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        this.destCluster.startCluster(brokerProps, "lkc-dest", 11, false);
        KafkaPrincipal kafkaPrincipal = new KafkaPrincipal("User", "Alice");
        KafkaPrincipal kafkaPrincipal2 = new KafkaPrincipal("User", "Bob");
        KafkaPrincipal kafkaPrincipal3 = new KafkaPrincipal("User", "sourceOperator");
        KafkaPrincipal kafkaPrincipal4 = new KafkaPrincipal("User", "destOperator");
        ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL);
        ResourcePattern resourcePattern2 = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.PREFIXED);
        ResourcePattern resourcePattern3 = new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.emptyMap(), true, aclFilters(kafkaPrincipal, kafkaPrincipal2)).all().get();
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        Uuid linkId = this.destCluster.linkId("tenantLink");
        ConfluentAdmin confluentAdmin = this.sourceCluster.admin;
        AclBinding aclBinding = new AclBinding(resourcePattern, new AccessControlEntry(kafkaPrincipal.toString(), "", AclOperation.WRITE, AclPermissionType.ALLOW));
        AclBinding aclBinding2 = new AclBinding(resourcePattern2, new AccessControlEntry(kafkaPrincipal.toString(), "", AclOperation.READ, AclPermissionType.ALLOW));
        AclBinding aclBinding3 = new AclBinding(resourcePattern, new AccessControlEntry(kafkaPrincipal2.toString(), "", AclOperation.READ, AclPermissionType.ALLOW));
        AclBinding aclBinding4 = new AclBinding(resourcePattern2, new AccessControlEntry(kafkaPrincipal2.toString(), "", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        AclBinding aclBinding5 = new AclBinding(resourcePattern3, new AccessControlEntry(kafkaPrincipal3.toString(), "", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding aclBinding6 = new AclBinding(resourcePattern3, new AccessControlEntry(kafkaPrincipal4.toString(), "", AclOperation.ALL, AclPermissionType.ALLOW));
        confluentAdmin.createAcls(aclBindings(Collections.emptySet(), aclBinding, aclBinding2, aclBinding3, aclBinding4, aclBinding5)).all().get(15L, TimeUnit.SECONDS);
        ConfluentAdmin confluentAdmin2 = this.destCluster.admin;
        confluentAdmin2.createAcls(aclBindings(Collections.emptySet(), aclBinding, aclBinding6)).all().get(15L, TimeUnit.SECONDS);
        HashSet hashSet = new HashSet();
        hashSet.addAll(aclBindings(Collections.emptySet(), aclBinding6));
        hashSet.addAll(aclBindings(Collections.singleton(linkId), aclBinding2, aclBinding3, aclBinding4));
        hashSet.addAll(aclBindings(Arrays.asList(Uuid.ZERO_UUID, linkId), aclBinding));
        waitForDestAcls(hashSet);
        deleteAcls(confluentAdmin, aclBinding);
        hashSet.removeAll(aclBindings(Arrays.asList(Uuid.ZERO_UUID, linkId), aclBinding));
        hashSet.addAll(aclBindings(Collections.emptySet(), aclBinding));
        waitForDestAcls(hashSet);
        confluentAdmin2.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.AclFiltersProp(), aclFilters(kafkaPrincipal2)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        hashSet.removeAll(aclBindings(Collections.singleton(linkId), aclBinding2));
        hashSet.addAll(aclBindings(Collections.emptySet(), aclBinding2));
        waitForDestAcls(hashSet);
        String str2 = "tenantLink2";
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink2", this.sourceCluster, 1002, "EXTERNAL", Collections.emptyMap(), true, aclFilters(kafkaPrincipal, kafkaPrincipal2)).all().get();
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists(str2));
        }, true, "Link was not created");
        Uuid linkId2 = this.destCluster.linkId("tenantLink2");
        hashSet.clear();
        hashSet.addAll(aclBindings(Arrays.asList(linkId, linkId2), aclBinding3, aclBinding4));
        hashSet.addAll(aclBindings(Arrays.asList(Uuid.ZERO_UUID, linkId2), aclBinding2));
        hashSet.addAll(aclBindings(Collections.emptySet(), aclBinding, aclBinding6));
        waitForDestAcls(hashSet);
        confluentAdmin2.deleteClusterLinks(Collections.singleton("tenantLink2"), new DeleteClusterLinksOptions()).all().get(15L, TimeUnit.SECONDS);
        hashSet.clear();
        hashSet.addAll(aclBindings(Arrays.asList(Uuid.ZERO_UUID, linkId), aclBinding3, aclBinding4));
        hashSet.addAll(aclBindings(Collections.emptySet(), aclBinding, aclBinding2, aclBinding6));
        waitForDestAcls(hashSet);
        confluentAdmin2.deleteClusterLinks(Collections.singleton("tenantLink"), new DeleteClusterLinksOptions()).all().get(15L, TimeUnit.SECONDS);
        hashSet.clear();
        hashSet.addAll(aclBindings(Collections.emptySet(), aclBinding, aclBinding2, aclBinding3, aclBinding4, aclBinding6));
        waitForDestAcls(hashSet);
    }

    private String allAclsFilter(Uuid uuid) {
        return "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\", \"clusterLinkIds\" : [\"" + Uuid.ZERO_UUID + "\", \"" + uuid + "\"]  }}]}";
    }

    private final String sourceLinkAclsFilter(Uuid uuid) {
        return "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\", \"clusterLinkIds\" : [\"" + uuid + "\"]  }}]}";
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkAllAclFilters(String str, boolean z) throws Throwable {
        this.sourceCluster.useSourceInitiatedLink = false;
        this.destCluster.useSourceInitiatedLink = false;
        this.sourceCluster.startCluster(brokerProps(), "lkc-source", 1, false);
        Properties brokerProps = brokerProps();
        brokerProps.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        this.destCluster.startCluster(brokerProps, "lkc-dest", 11, false);
        KafkaTestUtils.addProducerAcls(this.sourceCluster.admin, this.principal, APP1_TOPIC, PatternType.LITERAL);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.emptyMap(), true, allAclsFilter(addAclsOnSourceWithLinkId(uuid -> {
            return consumerAcls(this.principal, uuid);
        }))).all().get();
        ArrayList arrayList = new ArrayList(this.sourceCluster.linkAcls(this.sourceCluster.linkUser));
        ArrayList arrayList2 = new ArrayList(this.allExpectedAcls);
        arrayList2.addAll(arrayList);
        verifyAclMigration(arrayList2, this.subsetExpectedAcls, "tenantLink");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkLocalAclFilters(String str, boolean z) throws Throwable {
        this.sourceCluster.useSourceInitiatedLink = false;
        this.destCluster.useSourceInitiatedLink = false;
        this.sourceCluster.startCluster(brokerProps(), "lkc-source", 1, false);
        Properties brokerProps = brokerProps();
        brokerProps.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        this.destCluster.startCluster(brokerProps, "lkc-dest", 11, false);
        KafkaTestUtils.addProducerAcls(this.sourceCluster.admin, this.principal, APP1_TOPIC, PatternType.LITERAL);
        addAclsOnSourceWithLinkId(uuid -> {
            return consumerAcls(this.principal, uuid);
        });
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.emptyMap(), true, SYNC_ALL_ACL_FILTER).all().get();
        ArrayList arrayList = new ArrayList(this.sourceCluster.linkAcls(this.sourceCluster.linkUser));
        ArrayList arrayList2 = new ArrayList(this.localExpectedAcls);
        arrayList2.addAll(arrayList);
        verifyAclMigration(arrayList2, this.subsetExpectedAcls, "tenantLink");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkSubsetAclFilters(String str, boolean z) throws Throwable {
        this.sourceCluster.useSourceInitiatedLink = false;
        this.destCluster.useSourceInitiatedLink = false;
        this.sourceCluster.startCluster(brokerProps(), "lkc-source", 1, false);
        Properties brokerProps = brokerProps();
        brokerProps.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        this.destCluster.startCluster(brokerProps, "lkc-dest", 11, false);
        KafkaTestUtils.addProducerAcls(this.sourceCluster.admin, this.principal, APP1_TOPIC, PatternType.LITERAL);
        addAclsOnSourceWithLinkId(uuid -> {
            return consumerAcls(this.principal, uuid);
        });
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.emptyMap(), true, this.subsetAclsFilter).all().get();
        verifyAclMigration(this.subsetExpectedAcls, this.subsetExpectedAcls, "tenantLink");
    }

    private void verifyAclMigration(Collection<AclBinding> collection, Collection<AclBinding> collection2, String str) throws Throwable {
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists(str));
        }, true, "Link was not created");
        Uuid linkId = this.destCluster.linkId(str);
        waitForDestAcls(aclBindings(Collections.singleton(linkId), (AclBinding[]) collection.toArray(new AclBinding[0])));
        deleteProducerAcls(this.sourceCluster.admin, collection2);
        ArrayList arrayList = new ArrayList(collection);
        arrayList.removeAll(collection2);
        waitForDestAcls(aclBindings(Collections.singleton(linkId), (AclBinding[]) arrayList.toArray(new AclBinding[0])));
        verifyAclMetrics(str);
    }

    private void verifyAclMetrics(String str) throws Exception {
        Map map = (Map) (this.destCluster.isKraft() ? ((KafkaBroker) ((List) this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers().stream().filter(kafkaBroker -> {
            return kafkaBroker.clusterLinkManager().clusterLinkMetadataManager().isDefined() && ((ClusterLinkMetadataManager) kafkaBroker.clusterLinkManager().clusterLinkMetadataManager().get()).isLinkCoordinator(str);
        }).collect(Collectors.toList())).get(0)).metrics() : this.destCluster.physicalCluster.kafkaCluster().controllerBrokerServer().metrics()).metrics().entrySet().stream().filter(entry -> {
            return isAclMetric((MetricName) entry.getKey(), str);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        Assertions.assertTrue(map.size() > 0);
        for (Map.Entry entry2 : map.entrySet()) {
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return ((Double) ((KafkaMetric) entry2.getValue()).metricValue()).doubleValue() > 0.0d;
            }, "Metric not updated: " + ((MetricName) entry2.getKey()).name());
        }
    }

    private boolean isAclMetric(MetricName metricName, String str) {
        return (metricName.name().equals("acls-deleted-rate") || metricName.name().equals("acls-deleted-total") || metricName.name().equals("acls-added-rate") || metricName.name().equals("acls-added-total")) && metricName.group().equals("cluster-link-metrics") && Objects.equals(metricName.tags().get("link-name"), str);
    }

    private void deleteProducerAcls(ConfluentAdmin confluentAdmin, Collection<AclBinding> collection) {
        confluentAdmin.deleteAcls((Collection) collection.stream().map((v0) -> {
            return v0.toFilter();
        }).collect(Collectors.toCollection(HashSet::new)));
    }

    private Uuid addAclsOnSourceWithLinkId(Function<Uuid, List<AclBinding>> function) throws Throwable {
        String str = "linkInSource";
        this.sourceCluster.createDestClusterLinkResult(this.sourceCluster.admin, "linkInSource", this.destCluster, 1001, "EXTERNAL", Collections.emptyMap(), false, false);
        waitFor(() -> {
            return Boolean.valueOf(this.sourceCluster.linkIdExists(str));
        }, true, "Link was not created");
        Uuid linkId = this.sourceCluster.linkId("linkInSource");
        this.destCluster.deleteLinkAcls(this.destCluster.linkUser);
        this.sourceCluster.admin.createAcls(function.apply(linkId)).all().get(15L, TimeUnit.SECONDS);
        return linkId;
    }

    private List<AclBinding> consumerAcls(KafkaPrincipal kafkaPrincipal, Uuid uuid) {
        return Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, APP1_TOPIC, PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW, Collections.singleton(uuid))), new AclBinding(new ResourcePattern(ResourceType.GROUP, APP1_CONSUMER_GROUP, PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, APP1_CONSUMER_GROUP, PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW, Collections.singleton(uuid))));
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkWithClusterLinkPrefix(String str, boolean z) throws Throwable {
        verifyLinkDescription(testBasicEndToEndClusterLinking("src_", false, true, true, false));
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
        verifyTenantConnectionMetrics(false, false);
        Map<String, String> singletonMap = Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_");
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1002, "EXTERNAL", singletonMap, true, false).all().get();
        org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink2", this.sourceCluster, 1003, "EXTERNAL", singletonMap, true, false).all(), InvalidConfigurationException.class);
        this.destCluster.createDestClusterLinkResult(this.destCluster.createConfluentAdmin(this.destCluster.physicalCluster.createLogicalCluster("destLogicalCluster2", 101, new Integer[0])), "tenantLink", this.sourceCluster, 1004, "EXTERNAL", singletonMap, true, false).all().get();
    }

    private Uuid testBasicEndToEndClusterLinking(String str, Boolean bool, Boolean bool2, Boolean bool3, boolean z) throws Throwable {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        hashMap.put("confluent.cluster.link.enable.metrics.reduction", String.valueOf(z));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("confluent.cluster.link.enable.metrics.reduction", String.valueOf(z));
        setUpClusters(false, false, false, hashMap2, hashMap);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, str.isEmpty() ? Collections.emptyMap() : Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), str), bool);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        Uuid linkId = this.destCluster.linkId("tenantLink");
        createSourceTopic();
        createMirrorTopicWaitForSuccess(this.destCluster.admin, str);
        verifyTopicListing(str, this.destCluster);
        verifyTopicMirroring(str, this.sourceCluster, this.destCluster);
        verifyReplicaStatus(str, this.sourceCluster, this.destCluster);
        if (bool.booleanValue()) {
            verifyAclAndOffsetMigration(str);
        }
        if (bool2.booleanValue()) {
            addSourcePartitionsAndVerifyMirror(4, str);
        }
        if (bool3.booleanValue()) {
            changeSourceTopicConfigAndVerifyMirror(str);
        }
        verifyTopicMirroring(str, this.sourceCluster, this.destCluster);
        verifyTenantMetrics(linkId, 1001, str, false, z);
        verifyTenantConnectionMetrics(false, true);
        verifyMetricsGroups(linkId, z);
        verifyDestinationDeadThreadCount("tenantLink", z);
        Assertions.assertEquals("linkedTopic", this.destCluster.mirrorDescription(str + "linkedTopic", true).sourceTopic());
        stopMirroring(this.destCluster, str + "linkedTopic");
        return linkId;
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    void testDestinationClusterLinkBrokerLevelQuotaClOnly(String str, boolean z) throws Throwable {
        testDestinationClusterLinkQuotas(ConfluentConfigs.ClusterLinkQuotaMode.CLUSTER_LINK_ONLY, false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    void testDestinationClusterLinkTenantQuotasClOnly(String str, boolean z) throws Throwable {
        testDestinationClusterLinkQuotas(ConfluentConfigs.ClusterLinkQuotaMode.CLUSTER_LINK_ONLY, true);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    void testDestinationClusterLinkBrokerLevelQuota(String str, boolean z) throws Throwable {
        testDestinationClusterLinkQuotas(ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND, false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    void testDestinationClusterLinkTenantQuotas(String str, boolean z) throws Throwable {
        testDestinationClusterLinkQuotas(ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND, true);
    }

    private void testDestinationClusterLinkQuotas(ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode, boolean z) throws Throwable {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.cluster.link.tenant.quotas.enable", String.valueOf(z));
        hashMap.put("confluent.cluster.link.replication.quota.mode", clusterLinkQuotaMode.name());
        setUpClusters(false, false, true, new HashMap(), hashMap);
        this.numPartitions = 1;
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), false);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        this.destCluster.linkId("tenantLink");
        createSourceTopic();
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        verifyTopicListing();
        verifyQuota(clusterLinkQuotaMode, z);
    }

    private void createTenantQuotas(long j) {
        HashMap hashMap = new HashMap();
        String logicalClusterId = this.destCluster.logicalCluster.logicalClusterId();
        hashMap.put(logicalClusterId + "_dummy", quotaConfig(1L, Double.MAX_VALUE));
        hashMap.put(logicalClusterId, quotaConfig(j, Double.MAX_VALUE));
        TenantQuotaCallback.updateQuotas(hashMap, QuotaConfig.UNLIMITED_QUOTA);
    }

    private void setTenantRequestQuotas(int i) {
        HashMap hashMap = new HashMap();
        String logicalClusterId = this.destCluster.logicalCluster.logicalClusterId();
        hashMap.put(logicalClusterId + "_dummy", quotaConfig(Long.MAX_VALUE, Double.MAX_VALUE));
        hashMap.put(logicalClusterId, quotaConfig(Long.MAX_VALUE, i));
        TenantQuotaCallback.updateQuotas(hashMap, QuotaConfig.UNLIMITED_QUOTA);
    }

    private QuotaConfig quotaConfig(long j, double d) {
        return new QuotaConfig(Long.valueOf(j), (Long) null, (Double) null, (Double) null, (Double) null, Double.valueOf(d), QuotaConfig.UNLIMITED_QUOTA);
    }

    private void setQuota(boolean z, long j) throws Exception {
        if (z) {
            createTenantQuotas(j);
            return;
        }
        Properties properties = new Properties();
        properties.put("confluent.cluster.link.io.max.bytes.per.second", Long.toString(j));
        TestUtils.incrementalAlterConfigs((Seq) null, this.destCluster.internalAdminClient(), properties, false, AlterConfigOp.OpType.SET).all().get(15L, TimeUnit.SECONDS);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    void testDestinationClusterLinkTenantRequestQuotas(String str, boolean z) throws Throwable {
        ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode = ConfluentConfigs.ClusterLinkQuotaMode.CLUSTER_LINK_ONLY;
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.cluster.link.tenant.quotas.enable", String.valueOf(true));
        hashMap.put("confluent.cluster.link.replication.quota.mode", clusterLinkQuotaMode.name());
        hashMap.put("confluent.cluster.link.request.quota.request.percentage.multiplier", "0.9");
        hashMap.put(KafkaConfig.ClusterLinkFetcherThreadPoolModeProp(), FetcherThreadPoolMode$Link$.MODULE$.name());
        setUpClusters(false, false, true, new HashMap(), hashMap);
        inflateComputeTimeForFetchRequestQuotaRecord();
        this.numPartitions = 1;
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), false);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        this.destCluster.linkId("tenantLink");
        createSourceTopic();
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        verifyTopicListing();
        verifyRequestQuota(clusterLinkQuotaMode, true);
    }

    private void inflateComputeTimeForFetchRequestQuotaRecord() throws Exception {
        final long nanos = TimeUnit.MILLISECONDS.toNanos(100L);
        for (KafkaBroker kafkaBroker : this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers()) {
            final ClusterLinkRequestQuotaManager clusterLinkRequest = kafkaBroker.quotaManagers().clusterLinkRequest();
            org.apache.kafka.test.TestUtils.setFieldValue(kafkaBroker.clusterLinkManager(), "quotas", new ClusterLinkQuotas(kafkaBroker.quotaManagers().clusterLinkProduce(), new ClusterLinkTenantRequestQuota() { // from class: io.confluent.kafka.link.integration.MultiTenantClusterLinkTest.1
                public int getThrottleTimeMs(ClusterLinkTenantContext clusterLinkTenantContext, long j) {
                    return clusterLinkRequest.getThrottleTimeMs(clusterLinkTenantContext, j);
                }

                public void record(ClusterLinkTenantContext clusterLinkTenantContext, long j, LinkRequestQuotaUsageType linkRequestQuotaUsageType) {
                    if (linkRequestQuotaUsageType == LinkRequestQuotaUsageType$Fetcher$.MODULE$) {
                        clusterLinkRequest.record(clusterLinkTenantContext, j + nanos, linkRequestQuotaUsageType);
                    } else {
                        clusterLinkRequest.record(clusterLinkTenantContext, j, linkRequestQuotaUsageType);
                    }
                }

                public boolean isQuotaExceeded(ClusterLinkTenantContext clusterLinkTenantContext) {
                    return clusterLinkRequest.isQuotaExceeded(clusterLinkTenantContext);
                }
            }));
        }
    }

    private boolean destClusterLinkReplicasThrottled() {
        return yammerMetricMaxValue("kafka.server:type=ReplicaManager,name=ThrottledClusterLinkReplicasPerSec") > 0.0d;
    }

    protected List<Metric> yammerMetrics(String str) {
        return (List) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream().filter(entry -> {
            return ((com.yammer.metrics.core.MetricName) entry.getKey()).getMBeanName().startsWith(str);
        }).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList());
    }

    protected double yammerMetricMaxValue(String str) {
        List<Metric> yammerMetrics = yammerMetrics(str);
        Assertions.assertTrue(!yammerMetrics.isEmpty(), "Metric does not exist: " + str);
        return yammerMetrics.stream().mapToDouble(metric -> {
            return ((Meter) metric).count();
        }).max().getAsDouble();
    }

    private void verifyQuota(ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode, boolean z) throws Throwable {
        setQuota(z, 100L);
        produceUntilDestThrottled("Destination cluster link replication quota not applied");
        verifyClusterLinkReplicaQuotaMetrics(clusterLinkQuotaMode, z);
        verifyLinkFetcherThrottleMetric(QuotaType$ClusterLinkReplication$.MODULE$);
        setQuota(z, 500000L);
        produceRecords(this.sourceCluster, 10);
        waitForMirror();
        verifyClusterLinkQuotaLocalProduceMetrics(clusterLinkQuotaMode, z);
        verifyClusterLinkRequestQuotaRateMetrics(z);
    }

    private void verifyRequestQuota(ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode, boolean z) throws Throwable {
        setTenantRequestQuotas(1);
        produceUntilDestThrottled("Destination cluster link request quota not applied");
        verifyClusterLinkReplicaQuotaMetrics(clusterLinkQuotaMode, z);
        verifyLinkFetcherThrottleMetric(QuotaType$ClusterLinkRequest$.MODULE$);
        setTenantRequestQuotas(1000);
        produceRecords(this.sourceCluster, 10);
        waitForMirror();
        verifyClusterLinkQuotaLocalProduceMetrics(clusterLinkQuotaMode, z);
        verifyClusterLinkRequestQuotaRateMetrics(z);
    }

    private void waitForMirror() throws Exception {
        KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup", Optional.empty()), "linkedTopic", 0, this.nextMessageIndex);
    }

    private void verifyClusterLinkRequestQuotaRateMetrics(boolean z) {
        if (z) {
            String str = "request-time";
            TestUtils.waitUntilTrue(() -> {
                return Boolean.valueOf(metricValue(this.destCluster, QuotaType$ClusterLinkRequest$.MODULE$.toString(), str, Collections.emptyMap(), false) > 0.0d);
            }, () -> {
                return "No Cluster link request quota record seen";
            }, 15000L, 100L);
        }
    }

    private void verifyClusterLinkReplicaQuotaMetrics(ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode, boolean z) {
        if (clusterLinkQuotaMode == ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND) {
            String str = z ? "link-byte-rate" : "byte-rate";
            TestUtils.waitUntilTrue(() -> {
                return Boolean.valueOf(metricValue(this.destCluster, QuotaType$ClusterLinkReplication$.MODULE$.toString(), str, Collections.emptyMap(), false) > 0.0d);
            }, () -> {
                return "No Cluster link traffic seen";
            }, 15000L, 100L);
        }
    }

    private void verifyLinkFetcherThrottleMetric(QuotaType quotaType) {
        String str = quotaType == QuotaType$ClusterLinkRequest$.MODULE$ ? "link-fetcher-request-throttle-total" : "link-fetcher-produce-throttle-total";
        HashMap hashMap = new HashMap();
        hashMap.put("link-name", "tenantLink");
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, this.destCluster.logicalCluster.logicalClusterId());
        String str2 = str;
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(metricValue(this.destCluster, "cluster-link-metrics", str2, hashMap, false) > 0.0d);
        }, () -> {
            return "No Cluster link throttling metric recorded";
        }, 15000L, 100L);
    }

    private void verifyClusterLinkQuotaLocalProduceMetrics(ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode, boolean z) throws Throwable {
        this.destCluster.physicalCluster.kafkaCluster().createTopic(this.destCluster.user.tenantPrefix() + "linkedTopicDest", this.numPartitions, 1);
        KafkaTestUtils.sendRecords(this.destCluster.getOrCreateProducer(Optional.empty()), "linkedTopicDest", 0, 10);
        String str = "local-produce-tokens";
        if (z && clusterLinkQuotaMode == ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND) {
            TestUtils.waitUntilTrue(() -> {
                return Boolean.valueOf(metricValue(this.destCluster, QuotaType$ClusterLinkReplication$.MODULE$.toString(), str, Collections.emptyMap(), false) > 0.0d);
            }, () -> {
                return "No local produce traffic seen";
            }, 15000L, 100L);
        } else {
            Assertions.assertEquals(0.0d, metricValue(this.destCluster, QuotaType$ClusterLinkReplication$.MODULE$.toString(), "local-produce-tokens", Collections.emptyMap(), false));
        }
    }

    protected void produceUntilDestThrottled(String str) throws Throwable {
        int i = 0;
        do {
            i++;
            produceRecords(this.sourceCluster, 20);
            if (destClusterLinkReplicasThrottled()) {
                break;
            }
        } while (i < 100);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(destClusterLinkReplicasThrottled());
        }, () -> {
            return str;
        }, 15000L, 100L);
    }

    private void produceRecords(MultiTenantCluster multiTenantCluster, int i) throws Throwable {
        int i2 = this.nextMessageIndex;
        this.nextMessageIndex += i;
        KafkaTestUtils.sendRecords(multiTenantCluster.getOrCreateProducer(Optional.empty()), "linkedTopic", i2, i);
    }

    private void verifyLinkDescription(Uuid uuid) throws Exception {
        ClusterLinkDescription clusterLinkDescription = (ClusterLinkDescription) ((Collection) this.destCluster.admin.describeClusterLinks(new DescribeClusterLinksOptions().linkNames(Collections.singleton("tenantLink")).includeTasks(true)).result().get(15L, TimeUnit.SECONDS)).iterator().next();
        Assertions.assertEquals(uuid, clusterLinkDescription.clusterLinkId());
        Assertions.assertEquals("tenantLink", clusterLinkDescription.linkName());
        Assertions.assertEquals(ClusterLinkDescription.LinkMode.DESTINATION, clusterLinkDescription.linkMode());
        Assertions.assertEquals(ClusterLinkDescription.ConnectionMode.OUTBOUND, clusterLinkDescription.connectionMode());
        Assertions.assertEquals("lkc-source", clusterLinkDescription.remoteClusterId());
        Assertions.assertEquals("lkc-dest", clusterLinkDescription.localClusterId());
        Assertions.assertEquals(ClusterLinkError.NO_ERROR, clusterLinkDescription.clusterLinkError());
        String str = "EXTERNAL://" + clusterLinkDescription.linkCoordinatorHost() + ":" + clusterLinkDescription.linkCoordinatorPort();
        List<KafkaBroker> kafkaBrokers = this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers();
        Assertions.assertTrue(kafkaBrokers.stream().anyMatch(kafkaBroker -> {
            return kafkaBroker.advertisedListeners().exists(endPoint -> {
                return Boolean.valueOf(endPoint.connectionString().equals(str));
            });
        }), "Invalid coordinator " + str + ", advertisedListeners=" + kafkaBrokers.stream().map((v0) -> {
            return v0.advertisedListeners();
        }).collect(Collectors.toList()));
        Assertions.assertEquals(4, clusterLinkDescription.taskDescriptions().size());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultiTenantClusterLinkNonUpdatableConfigPolicyViolationTest(String str, boolean z) throws Throwable {
        setUpClusters(false, false, true);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), false);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        Properties properties = new Properties();
        properties.setProperty("retention.ms", "60000");
        this.sourceCluster.physicalCluster.kafkaCluster().kafkas().get(0).createTopic(new ListenerName("INTERNAL"), this.sourceCluster.user.tenantPrefix() + "linkedTopic", this.numPartitions, 1, properties);
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        verifyTopicListing();
        verifyTopicMirroring();
        final ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.admin.incrementalAlterConfigs(new HashMap<ConfigResource, Collection<AlterConfigOp>>() { // from class: io.confluent.kafka.link.integration.MultiTenantClusterLinkTest.2
            {
                put(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaFetchMaxBytesProp(), "5242882"), AlterConfigOp.OpType.SET)));
            }
        }).all(), PolicyViolationException.class);
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", ClusterLinkConfig.TopicConfigSyncIncludeProp(), MirrorTopicConfigSyncRules.AlwaysConfigs().mkString(",") + ",flush.ms", false);
        })).getCause().getClass());
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", ClusterLinkConfig.TopicConfigSyncIncludeProp(), MirrorTopicConfigSyncRules.AlwaysConfigs().mkString(",") + ",retention.ms", false);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            String linkConfig = this.destCluster.linkConfig("tenantLink", ClusterLinkConfig.TopicConfigSyncIncludeProp());
            return linkConfig != null && linkConfig.contains("retention.ms");
        }, ClusterLinkConfig.TopicConfigSyncIncludeProp() + " not updated");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return "60000".equals(this.destCluster.topicConfig("linkedTopic", "retention.ms"));
        }, "Retention not sync'ed to mirror topic");
        Assertions.assertEquals(String.valueOf(Long.MAX_VALUE), this.destCluster.topicConfig("linkedTopic", "flush.ms"));
        verifyTopicMirroring();
        stopMirroring(this.destCluster, "linkedTopic");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testClusterLinkSaslConfigs(String str, boolean z) throws Throwable {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.plugins.link.policy.sasl.login.module.allowed", OAuthBearerLoginModule.class.getName() + "," + PlainLoginModule.class.getName() + "," + ScramLoginModule.class.getName() + "," + DummyOAuthLoginModule.class.getName());
        setUpClusters(false, false, false, Collections.emptyMap(), hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("security.protocol", "SASL_PLAINTEXT");
        hashMap2.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), false);
        hashMap2.put("sasl.mechanism", "GSSAPI");
        int i = 2 + 1;
        verifyInvalidLinkSecurityConfigs(2, hashMap2, PolicyViolationException.class, "sasl.mechanism");
        verifyLinkUpdateFailure(hashMap2, PolicyViolationException.class, "sasl.mechanism");
        hashMap2.put("sasl.mechanism", "SCRAM-SHA-256");
        int i2 = i + 1;
        verifyValidLinkSecurityConfigs(i, hashMap2);
        hashMap2.put("sasl.mechanism", "PLAIN");
        int i3 = i2 + 1;
        verifyValidLinkSecurityConfigs(i2, hashMap2);
        hashMap2.put("sasl.client.callback.handler.class", SaslClientCallbackHandler.class.getName());
        int i4 = i3 + 1;
        verifyValidLinkSecurityConfigs(i3, hashMap2);
        verifyLinkUpdateFailure(hashMap2, PolicyViolationException.class, "sasl.client.callback.handler.class");
        hashMap2.remove("sasl.client.callback.handler.class");
        hashMap2.put("sasl.login.callback.handler.class", DynamicPlainLoginCallbackHandler.class.getName());
        int i5 = i4 + 1;
        verifyInvalidLinkSecurityConfigs(i4, hashMap2, PolicyViolationException.class, "sasl.login.callback.handler.class");
        verifyLinkUpdateFailure(hashMap2, PolicyViolationException.class, "sasl.login.callback.handler.class");
        hashMap2.put("sasl.login.callback.handler.class", "non.existent.class");
        int i6 = i5 + 1;
        verifyInvalidLinkSecurityConfigs(i5, hashMap2, InvalidConfigurationException.class, "sasl.login.callback.handler.class");
        verifyLinkUpdateFailure(hashMap2, this.destCluster.isKraft() ? InvalidConfigurationException.class : PolicyViolationException.class, "sasl.login.callback.handler.class");
        hashMap2.put("sasl.mechanism", "OAUTHBEARER");
        hashMap2.put("sasl.jaas.config", DummyOAuthLoginModule.class.getName() + " required clientId=\"id\" clientSecret=\"secret\";");
        hashMap2.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
        String str2 = new OAuthUtils.Builder(36000, "Confluent", "Test", null).userIds(new Integer[]{1234}).withKid(true).build().userTokens().get(1234);
        hashMap2.put("sasl.oauthbearer.token.endpoint.url", "http://localhost:12345");
        int i7 = i6 + 1;
        verifyValidLinkSecurityConfigs(i6, hashMap2);
        hashMap2.put("sasl.oauthbearer.token.endpoint.url", "https://localhost:12345");
        int i8 = i7 + 1;
        verifyValidLinkSecurityConfigs(i7, hashMap2);
        hashMap2.put("sasl.oauthbearer.scope.claim.name", "myscope");
        hashMap2.put("sasl.oauthbearer.sub.claim.name", "mysub");
        int i9 = i8 + 1;
        verifyValidLinkSecurityConfigs(i8, hashMap2);
        hashMap2.put("sasl.oauthbearer.token.endpoint.url", org.apache.kafka.test.TestUtils.tempFile(str2).toURI().toString());
        int i10 = i9 + 1;
        verifyInvalidLinkSecurityConfigs(i9, hashMap2, PolicyViolationException.class, "unsupported protocol 'file'");
        verifyLinkUpdateFailure(hashMap2, PolicyViolationException.class, "unsupported protocol 'file'");
        hashMap2.remove("sasl.oauthbearer.token.endpoint.url");
        int i11 = i10 + 1;
        verifyInvalidLinkSecurityConfigs(i10, hashMap2, InvalidConfigurationException.class, "sasl.oauthbearer.token.endpoint.url");
        hashMap2.put("sasl.login.callback.handler.class", "non.existent.class");
        int i12 = i11 + 1;
        verifyInvalidLinkSecurityConfigs(i11, hashMap2, InvalidConfigurationException.class, "sasl.login.callback.handler.class");
        verifyLinkUpdateFailure(hashMap2, this.destCluster.isKraft() ? InvalidConfigurationException.class : PolicyViolationException.class, "sasl.login.callback.handler.class");
        hashMap2.put("sasl.login.callback.handler.class", DynamicPlainLoginCallbackHandler.class.getName());
        verifyInvalidLinkSecurityConfigs(i12, hashMap2, PolicyViolationException.class, "sasl.login.callback.handler.class");
        verifyLinkUpdateFailure(hashMap2, PolicyViolationException.class, "sasl.login.callback.handler.class");
        hashMap2.put("sasl.jaas.config", OAuthBearerLoginModule.class.getName() + " required clientId=\"id\" clientSecret=\"secret\";");
        hashMap2.remove("sasl.login.callback.handler.class");
        verifyInvalidLinkSecurityConfigs(i12 + 1, hashMap2, InvalidConfigurationException.class, "login failure");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
    }

    private void verifyValidLinkSecurityConfigs(int i, Map<String, String> map) throws Throwable {
        CreateClusterLinksResult createLink = createLink(i, map);
        String str = (String) createLink.result().keySet().iterator().next();
        createLink.all().get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists(str));
        }, true, "Link was not created");
        this.destCluster.deleteClusterLink(this.destCluster.admin, str);
    }

    private CreateClusterLinksResult createLink(int i, Map<String, String> map) throws Throwable {
        return this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "link" + i, this.sourceCluster, 1000 + i, "INTERNAL", map, false);
    }

    private void verifyInvalidLinkSecurityConfigs(int i, Map<String, String> map, Class<? extends Throwable> cls, String str) throws Throwable {
        if (this.destCluster.isKraft()) {
            return;
        }
        Throwable assertFutureThrows = org.apache.kafka.test.TestUtils.assertFutureThrows(createLink(i, map).all(), cls);
        Assertions.assertTrue(assertFutureThrows.getMessage().contains(str), "Unexpected failure: " + assertFutureThrows.getMessage());
    }

    private void verifyLinkUpdateFailure(Map<String, String> map, Class<? extends Throwable> cls, String str) throws Throwable {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        HashMap hashMap = new HashMap();
        hashMap.put(configResource, (List) map.entrySet().stream().map(entry -> {
            return new AlterConfigOp(new ConfigEntry((String) entry.getKey(), (String) entry.getValue()), AlterConfigOp.OpType.SET);
        }).collect(Collectors.toList()));
        Throwable assertFutureThrows = org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.admin.incrementalAlterConfigs(hashMap).all(), cls);
        Assertions.assertTrue(assertFutureThrows.getMessage().contains(str), "Unexpected failure: " + assertFutureThrows.getMessage());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testClusterLinkSecurityUpdate(String str, boolean z) throws Throwable {
        setUpClusters(false, false, true);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.singletonMap(ClusterLinkConfig.MirrorStartOffsetSpecProp(), "latest"), true);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        createSourceTopic();
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(), "linkedTopic", 0, 10);
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "", new NewMirrorTopic("tenantLink", "", OffsetSpec.latest()));
        verifyTopicListing();
        verifyTopicMirroring();
        verifyAclAndOffsetMigration("");
        LogicalClusterUser createLinkUser = this.sourceCluster.createLinkUser(2001);
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", "sasl.jaas.config", createLinkUser.saslJaasConfig());
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = createLinkUser;
        verifyTopicMirroring();
        verifyAclAndOffsetMigration("");
        verifySocketBufferSizeUpdate();
        addSourcePartitionsAndVerifyMirror(4, "");
        changeSourceTopicConfigAndVerifyMirror("");
        verifyTopicMirroring();
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceInitiatedLink(String str, boolean z) throws Throwable {
        setUpClusters(true, false, true);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1, Collections.emptyMap(), true);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003);
        waitFor(() -> {
            return Boolean.valueOf(this.sourceCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        createSourceTopic();
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        verifyTopicListing();
        verifyTopicMirroring();
        verifyMetricsGroups(this.destCluster.linkId("tenantLink"), false);
        verifyTenantMetrics(this.destCluster.linkId("tenantLink"), 1003, "", true, false);
        verifyTenantConnectionMetrics(true, true);
        verifyAclAndOffsetMigration("");
        LogicalClusterUser createLinkUser = this.sourceCluster.createLinkUser(2001);
        this.sourceCluster.alterClusterLink(this.sourceCluster.admin, "tenantLink", ClusterLinkConfig.LocalPrefix() + "sasl.jaas.config", createLinkUser.saslJaasConfig());
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = createLinkUser;
        verifyTopicMirroring();
        verifyAclAndOffsetMigration("");
        verifySocketBufferSizeUpdate();
        addSourcePartitionsAndVerifyMirror(4, "");
        changeSourceTopicConfigAndVerifyMirror("");
        verifyTopicMirroring();
        stopMirroring(this.destCluster, "linkedTopic");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "tenantLink");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
        verifyTenantConnectionMetrics(true, false);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSILMapsLocalReverseConnectionListenerPositiveCase(String str, boolean z) throws Throwable {
        testSILMapsLocalReverseConnectionListener(true);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSILMapsLocalReverseConnectionListenerNegativeCase(String str, boolean z) throws Throwable {
        testSILMapsLocalReverseConnectionListener(false);
    }

    private void testSILMapsLocalReverseConnectionListener(boolean z) throws Throwable {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfig.ListenersProp(), "FOO://localhost:0,INTERNAL_TENANT_SCOPED://localhost:0,INTERNAL://localhost:0,EXTERNAL://localhost:0");
        hashMap.put(KafkaConfig.ClusterLinkLocalReverseConnectionListenerMapProp(), z ? "FOO:INTERNAL_TENANT_SCOPED,INTERNAL_TENANT_SCOPED:INTERNAL_TENANT_SCOPED,EXTERNAL:EXTERNAL,INTERNAL:INTERNAL" : "FOO:FOO,INTERNAL_TENANT_SCOPED:FOO,EXTERNAL:EXTERNAL,INTERNAL:INTERNAL");
        hashMap.put(KafkaConfig.ListenerSecurityProtocolMapProp(), "FOO:SASL_SSL,INTERNAL_TENANT_SCOPED:SASL_SSL,INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
        hashMap.put("listener.name.foo.scram-sha-256.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        hashMap.put("listener.name.foo.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        hashMap.put("listener.name.internal_tenant_scoped.scram-sha-256.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        hashMap.put("listener.name.internal_tenant_scoped.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        if (TestInfoUtils.isKRaft(this.testInfo)) {
            hashMap.put("listener.name.foo.scram-sha-256.sasl.server.callback.handler.class", PhysicalCluster.KRaftScramIntegrationTestCallbackHandler.class.getName());
            hashMap.put("listener.name.internal_tenant_scoped.scram-sha-256.sasl.server.callback.handler.class", PhysicalCluster.KRaftScramIntegrationTestCallbackHandler.class.getName());
        }
        setupSourceCluster(true, false, true, hashMap, Collections.emptyMap(), Optional.of("EXTERNAL"), true, "abc.cpdev.cloud", "none");
        setupDestCluster(true, Collections.singletonMap("confluent.ccloud.host.suffixes", MultiTenantRequestContextTest.LOCALHOST));
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1, Collections.emptyMap(), true);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        Properties properties = new Properties();
        properties.putAll(this.sourceCluster.sslConfigs);
        properties.put("ssl.endpoint.identification.algorithm", "");
        ConfluentAdmin createAdminClient = KafkaTestUtils.createAdminClient(this.sourceCluster.physicalCluster.bootstrapServers("FOO"), SecurityProtocol.SASL_SSL, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.sourceCluster.logicalCluster.adminUser().saslJaasConfig(), properties);
        this.sourceCluster.createSourceClusterLinkResult(createAdminClient, "tenantLink", this.destCluster, 1004, Collections.emptyMap(), "EXTERNAL", SecurityProtocol.SASL_SSL).all().get();
        waitFor(() -> {
            return Boolean.valueOf(this.sourceCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        String linkConfig = this.sourceCluster.linkConfig("tenantLink", ClusterLinkConfig.LocalListenerNameProp());
        if (z) {
            Assertions.assertEquals("INTERNAL_TENANT_SCOPED", linkConfig);
        } else {
            Assertions.assertEquals("FOO", linkConfig);
        }
        createSourceTopic();
        if (!z) {
            createMirrorTopicWaitForFailure(this.destCluster.admin);
            createAdminClient.close();
            return;
        }
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        verifyTopicListing();
        verifyTopicMirroring(Optional.of("EXTERNAL"));
        stopMirroring(this.destCluster, "linkedTopic");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "tenantLink");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
        createAdminClient.close();
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceInitiatedLinkLocalConfigs(String str, boolean z) throws Throwable {
        setUpClusters(true, true, true);
        ConfluentAdmin confluentAdmin = this.sourceCluster.admin;
        this.sourceCluster.admin = this.sourceCluster.createAdminClient(this.sourceCluster.logicalCluster.adminUser());
        addAcls(this.sourceCluster.admin, this.sourceCluster.user, new String[0]);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1, Collections.emptyMap(), true);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003, Collections.singletonMap("local.bootstrap.servers", "somehost:9072"));
        waitFor(() -> {
            return Boolean.valueOf(this.sourceCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        this.sourceCluster.admin.createTopics(Collections.singleton(new NewTopic("linkedTopic", Optional.empty(), Optional.of((short) 1))));
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        verifyTopicMirroring();
        this.sourceCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Arrays.asList(new AlterConfigOp(new ConfigEntry("local.bootstrap.servers", "somehost:9071"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("local.security.protocol", "PLAINTEXT"), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        verifyTopicMirroring();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, this.sourceCluster.linkUser.tenantPrefix() + "tenantLink");
        confluentAdmin.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "true"), AlterConfigOp.OpType.SET)))).all().get();
        confluentAdmin.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "false"), AlterConfigOp.OpType.SET)))).all().get();
        verifyTopicMirroring();
        stopMirroring(this.destCluster, "linkedTopic");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "tenantLink");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMaxClusterLink(String str, boolean z) throws Throwable {
        Map<String, String> singletonMap = Collections.singletonMap("confluent.cluster.link.num.background.threads", "2");
        setUpClusters(false, false, true, singletonMap, singletonMap);
        this.destCluster.addClusterAcls(new KafkaPrincipal("User", "broker"), "All");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, 1001, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, 1002, Collections.emptyMap(), false);
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link1");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link2");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link3", this.sourceCluster, 1003, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link4", this.sourceCluster, 1004, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link5", this.sourceCluster, 1005, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link6", this.sourceCluster, 1006, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link7", this.sourceCluster, 1007, Collections.emptyMap(), false);
        verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.DESTINATION, "link8", 1008, ClusterLinkDescription.LinkMode.DESTINATION, 5);
        verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.BIDIRECTIONAL, "link9", 1009, ClusterLinkDescription.LinkMode.DESTINATION, 5);
        Map<String, String> singletonMap2 = Collections.singletonMap(ClusterLinkConfig.LinkModeProp(), ClusterLinkDescription.LinkMode.BIDIRECTIONAL.name());
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link7");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link10", this.sourceCluster, 1010, singletonMap2, false);
        verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.DESTINATION, "link11", 1011, ClusterLinkDescription.LinkMode.DESTINATION, 5);
        verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.BIDIRECTIONAL, "link12", 1012, ClusterLinkDescription.LinkMode.DESTINATION, 5);
        List asList = Arrays.asList("link3", "link4", "link5", "link6", "link10");
        if (!TestInfoUtils.isKRaft(this.testInfo)) {
            changeMaxClusterLinks(6, -1);
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "link21", this.sourceCluster, 1021, "EXTERNAL", Collections.emptyMap(), true).all().get(15L, TimeUnit.SECONDS);
            this.destCluster.deleteClusterLink(this.destCluster.admin, "link10");
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "link22", this.sourceCluster, 1022, "EXTERNAL", singletonMap2, true).all().get(30L, TimeUnit.SECONDS);
            asList = Arrays.asList("link3", "link4", "link5", "link6", "link21", "link22");
        }
        int count = (int) asList.stream().filter(str2 -> {
            return ClusterLinkManager.hashValue(new StringBuilder().append(this.destCluster.user.tenantPrefix()).append(str2).toString(), 2) == 0;
        }).count();
        Assertions.assertEquals(1, ClusterLinkManager.hashValue("", 2));
        waitForBackgroundThreadUsage(this.destCluster, 0, count);
        waitForBackgroundThreadUsage(this.destCluster, 1, (asList.size() - count) + 1);
        waitForBackgroundThreadUsage(this.sourceCluster, 0, 0);
        waitForBackgroundThreadUsage(this.sourceCluster, 1, 1);
        waitForBackgroundThreadTenants(this.destCluster, 0, 1);
        waitForBackgroundThreadTenants(this.destCluster, 1, 1);
        waitForBackgroundThreadTenants(this.sourceCluster, 0, 0);
        waitForBackgroundThreadTenants(this.sourceCluster, 1, 0);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTenantAclSyncLimit(String str, boolean z) throws Throwable {
        setUpClusters(false, false, true, 30);
        waitForAcls(this.sourceCluster, 4, null);
        waitForAcls(this.destCluster, 4, null);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, 1001, Collections.emptyMap());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, 1002, Collections.emptyMap());
        Uuid linkId = this.destCluster.linkId("link1");
        Uuid linkId2 = this.destCluster.linkId("link2");
        int i = 14;
        waitForAcls(this.sourceCluster, 14, null);
        waitForAcls(this.destCluster, 14 + 4, null);
        waitForAcls(this.destCluster, 14, linkId);
        waitForAcls(this.destCluster, 14, linkId2);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return aclAddFailedMetric() == 0.0d;
        }, "ACL update failed before reaching limit " + aclAddFailedMetric());
        while (i + 4 < 30 - 2) {
            int i2 = i;
            i++;
            addSourceAcl("topic" + i2);
            waitForAcls(this.sourceCluster, i, null);
            waitForAcls(this.destCluster, i + 4, null);
            waitForAcls(this.destCluster, i, linkId);
            waitForAcls(this.destCluster, i, linkId2);
        }
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = i;
            i++;
            addSourceAcl("topic" + i4);
        }
        waitForAcls(this.sourceCluster, i, null);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return aclAddFailedMetric() > 0.0d;
        }, "ACL failed metric not updated");
        Assertions.assertTrue(aclCount(this.destCluster, null) <= 30, "Too many dest ACLs " + aclCount(this.destCluster, null));
        int aclCount = aclCount(this.destCluster, null);
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link1");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return aclCount(this.destCluster, linkId) == 0;
        }, "Link id not removed on cluster link delete");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link2");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return aclCount(this.destCluster, linkId2) == 0;
        }, "Link id not removed on cluster link delete");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return aclCount(this.destCluster, null) == aclCount;
        }, "ACLs not present after cluster link delete");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMaxSourceInitiatedClusterLink(String str, boolean z) throws Throwable {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.cluster.link.num.background.threads", "2");
        hashMap.put("confluent.cluster.link.background.thread.affinity", "TENANT");
        if (TestInfoUtils.isKRaft(this.testInfo)) {
            hashMap.put("confluent.plugins.cluster.link.policy.max.destination.links.per.tenant", "10");
            hashMap.put("confluent.plugins.cluster.link.policy.max.source.links.per.tenant", "2");
            setUpClusters(true, false, true, hashMap, hashMap);
        } else {
            setUpClusters(true, false, true, hashMap, hashMap);
            changeMaxClusterLinks(10, 2);
        }
        KafkaPrincipal kafkaPrincipal = new KafkaPrincipal("User", "broker");
        this.destCluster.addClusterAcls(kafkaPrincipal, "All");
        this.sourceCluster.addClusterAcls(kafkaPrincipal, "All");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link1", this.destCluster, 2001);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link2", this.destCluster, 2002);
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link1");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link2");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link3", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link3", this.destCluster, 2003);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link4", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link4", this.destCluster, 2004);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link5", this.sourceCluster, -1, Collections.emptyMap());
        verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.SOURCE, "link5", 2005, ClusterLinkDescription.LinkMode.SOURCE, 2);
        Map<String, String> singletonMap = Collections.singletonMap(ClusterLinkConfig.LinkModeProp(), ClusterLinkDescription.LinkMode.BIDIRECTIONAL.name());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link6", this.sourceCluster, -1, singletonMap);
        verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.BIDIRECTIONAL, "link6", 2006, ClusterLinkDescription.LinkMode.SOURCE, 2);
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link4");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link7", this.sourceCluster, -1, singletonMap);
        this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, "link7", this.destCluster, 2007, singletonMap).all().get(15L, TimeUnit.SECONDS);
        Assertions.assertEquals(0, ClusterLinkManager.hashValue(this.destCluster.user.tenantPrefix(), 2));
        Assertions.assertEquals(1, ClusterLinkManager.hashValue("", 2));
        Assertions.assertEquals(1, ClusterLinkManager.hashValue(this.sourceCluster.user.tenantPrefix(), 2));
        waitForBackgroundThreadUsage(this.destCluster, 0, 7);
        waitForBackgroundThreadUsage(this.destCluster, 1, 1);
        waitForBackgroundThreadUsage(this.sourceCluster, 0, 0);
        waitForBackgroundThreadUsage(this.sourceCluster, 1, 2 + 1);
        waitForBackgroundThreadTenants(this.destCluster, 0, 1);
        waitForBackgroundThreadTenants(this.destCluster, 1, 0);
        waitForBackgroundThreadTenants(this.sourceCluster, 0, 0);
        waitForBackgroundThreadTenants(this.sourceCluster, 1, 1);
        if (TestInfoUtils.isKRaft(this.testInfo)) {
            return;
        }
        changeMaxClusterLinks(-1, 3);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link21", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link21", this.destCluster, 2021);
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link3");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link22", this.sourceCluster, -1, singletonMap);
        this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, "link22", this.destCluster, 2022, singletonMap).all().get(15L, TimeUnit.SECONDS);
    }

    private void setUpClustersForMtlsUsingDestBrokerProps() throws Exception {
        setupSourceCluster(false, true, false, Collections.emptyMap(), Optional.empty());
        String str = "network-" + UUID.randomUUID().toString();
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.ccloud.host.suffixes", MultiTenantRequestContextTest.LOCALHOST);
        hashMap.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        hashMap.put("confluent.traffic.network.id", str);
        Map clientConfigs = this.sourceCluster.clientConfigs("INTERNAL");
        ClusterLinkCCloudToCCloudChannelBuilder.MTLS_CONFIGS_TO_OVERRIDE().forEach(str2 -> {
            if (clientConfigs.containsKey(str2)) {
                hashMap.put(str2, clientConfigs.get(str2));
            }
        });
        setupDestCluster(false, hashMap);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMtlsClusterLinkUsingDestBrokerProps(String str, boolean z) throws Throwable {
        Set set = (Set) org.apache.kafka.test.TestUtils.fieldValue((Object) null, ClusterLinkPolicyConfig.class, "ALLOWED_CCLOUD_TO_CCLOUD_SECURITY_PROTOCOLS");
        HashSet hashSet = new HashSet(set);
        try {
            set.add(SecurityProtocol.SSL.name());
            setUpClustersForMtlsUsingDestBrokerProps();
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "INTERNAL", false, Collections.emptyMap(), true, SYNC_ALL_ACL_FILTER, null).all().get(15L, TimeUnit.SECONDS);
            waitFor(() -> {
                return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
            }, true, "Link was not created");
            this.destCluster.linkId("tenantLink");
            this.sourceCluster.admin.createTopics(Collections.singleton(new NewTopic("linkedTopic", Optional.empty(), Optional.of((short) 1))));
            waitFor(() -> {
                try {
                    return Boolean.valueOf(((Map) this.sourceCluster.admin.listTopics().namesToListings().get()).containsKey("linkedTopic"));
                } catch (Exception e) {
                    return false;
                }
            }, true, "Failed to list topic");
            createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
            Properties producerProps = KafkaTestUtils.producerProps(this.sourceCluster.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.SSL, "", "");
            producerProps.putAll(this.sourceCluster.sslConfigs);
            KafkaProducer kafkaProducer = new KafkaProducer(producerProps);
            Throwable th = null;
            try {
                try {
                    KafkaTestUtils.sendRecords(kafkaProducer, "linkedTopic", 0, 10);
                    KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), "linkedTopic", 0, 10);
                    if (kafkaProducer != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaProducer.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            set.clear();
            set.addAll(hashSet);
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSslClusterLinkExplicitLinkProps(String str, boolean z) throws Throwable {
        setUpClusters(false, true, false);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "INTERNAL", Collections.emptyMap(), true).all().get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        this.destCluster.linkId("tenantLink");
        this.sourceCluster.admin.createTopics(Collections.singleton(new NewTopic("linkedTopic", Optional.empty(), Optional.of((short) 1))));
        waitFor(() -> {
            try {
                return Boolean.valueOf(((Map) this.sourceCluster.admin.listTopics().namesToListings().get()).containsKey("linkedTopic"));
            } catch (Exception e) {
                return false;
            }
        }, true, "Failed to list topic");
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        Properties producerProps = KafkaTestUtils.producerProps(this.sourceCluster.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.SSL, "", "");
        producerProps.putAll(this.sourceCluster.sslConfigs);
        KafkaProducer kafkaProducer = new KafkaProducer(producerProps);
        Throwable th = null;
        try {
            KafkaTestUtils.sendRecords(kafkaProducer, "linkedTopic", 0, 10);
            KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), "linkedTopic", 0, 10);
            if (kafkaProducer != null) {
                if (0 == 0) {
                    kafkaProducer.close();
                    return;
                }
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testValidateSecurityProtocol(String str, boolean z) throws Throwable {
        setUpClusters(false, false, true);
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "test.confluent.cloud:9071");
        hashMap.put("security.protocol", SecurityProtocol.PLAINTEXT.name);
        PolicyViolationException assertFutureThrows = org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", hashMap, false).all(), PolicyViolationException.class);
        Assertions.assertTrue(assertFutureThrows.getMessage().contains("Invalid security protocol PLAINTEXT for a Confluent Cloud to Confluent Cloud link, it must be SASL_SSL."), assertFutureThrows.getMessage());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1002, Collections.emptyMap(), false);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        PolicyViolationException assertFutureThrows2 = org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Lists.newArrayList(new AlterConfigOp[]{new AlterConfigOp(new ConfigEntry("bootstrap.servers", "test.confluent.cloud:9071"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("security.protocol", SecurityProtocol.PLAINTEXT.name), AlterConfigOp.OpType.SET)}))).all(), PolicyViolationException.class);
        Assertions.assertTrue(assertFutureThrows2.getMessage().contains("Invalid security protocol PLAINTEXT for a Confluent Cloud to Confluent Cloud link, it must be SASL_SSL."), assertFutureThrows2.getMessage());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testInternalIpAndPort(String str, boolean z) throws Throwable {
        setUpClusters(false, false, true);
        org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.singletonMap("bootstrap.servers", "10.0.0.3:9071"), false).all(), PolicyViolationException.class, String.format("Unable to validate cluster link due to error: Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [%s/%s:9071]", "", "10.0.0.3"));
        org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1002, "EXTERNAL", Collections.singletonMap("bootstrap.servers", "127.0.0.1:9071"), false).all(), PolicyViolationException.class, String.format("Unable to validate cluster link due to error: Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [%s/%s:9071]", "", FileBasedPlainSaslAuthHostNameValidationIntegrationTest.LOCAL_HOST_IP));
        org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1003, "EXTERNAL", Collections.singletonMap("bootstrap.servers", "localhost:9071"), false).all(), PolicyViolationException.class, String.format("Unable to validate cluster link due to error: Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [%s/%s:9071]", MultiTenantRequestContextTest.LOCALHOST, FileBasedPlainSaslAuthHostNameValidationIntegrationTest.LOCAL_HOST_IP));
        org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1004, "EXTERNAL", Collections.singletonMap("bootstrap.servers", " 11.0.0.3:9071 , 10.0.0.3:9071,10.0.0.4:9071"), false).all(), PolicyViolationException.class, "Unable to validate cluster link due to error: Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [/10.0.0.3:9071, /10.0.0.4:9071]");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1005, Collections.emptyMap(), false);
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        org.apache.kafka.test.TestUtils.assertFutureThrows(this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry("bootstrap.servers", "10.0.0.3:9071"), AlterConfigOp.OpType.SET)))).all(), PolicyViolationException.class, "Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [/10.0.0.3:9071]");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testCLRequestOnUnauthenticatedListener(String str, boolean z) throws Throwable {
        setUpClusters(false, false, true);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "INTERNAL", Collections.emptyMap(), true).all().get(15L, TimeUnit.SECONDS);
        this.destCluster.setInternalClusterLinkConfigs("tenantLink", Collections.singletonMap(ClusterLinkConfig.AvailabilityCheckMsProp(), "1000"));
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        createSourceTopic();
        Assertions.assertThrows(ExecutionException.class, () -> {
            createMirrorTopic(this.destCluster.admin, "");
        });
        waitFor(() -> {
            return this.destCluster.linkState("tenantLink");
        }, ClusterLinkDescription.LinkState.FAILED, "Link didn't transition to failed state");
        HashMap hashMap = new HashMap();
        hashMap.put("link-name", "tenantLink");
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, this.destCluster.logicalCluster.logicalClusterId());
        hashMap.put("reason", "unauthorized_bootstrap");
        Assertions.assertEquals(metricValue(this.destCluster, "cluster-link-metrics", "broker-failed-link-count", hashMap, false), 1.0d);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAclsOnAclFilterUpdate(String str, boolean z) throws Throwable {
        setUpClusters(false, false, true, 13);
        waitForAcls(this.sourceCluster, 4, null);
        waitForAcls(this.destCluster, 4, null);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap());
        waitForAcls(this.sourceCluster, 9, null);
        waitForAcls(this.destCluster, 4 + 9, null);
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", ClusterLinkConfig.AclFiltersProp(), this.subsetAclsFilter);
        waitForAcls(this.destCluster, 4 + 9, null);
        waitFor(() -> {
            return (Set) currentAcls(this.destCluster.admin).stream().filter(aclBinding -> {
                return !aclBinding.entry().clusterLinkIds().isEmpty();
            }).collect(Collectors.toSet());
        }, Collections.emptySet(), "Destination ACLs still have link id stamped");
        waitForAcls(this.destCluster, 4 + 9, Uuid.ZERO_UUID);
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", ClusterLinkConfig.AclFiltersProp(), SYNC_ALL_ACL_FILTER);
        Uuid linkId = this.destCluster.linkId("tenantLink");
        waitForAcls(this.destCluster, 9, linkId);
        waitForAcls(this.destCluster, 4 + 9, Uuid.ZERO_UUID);
        setAclLimit(this.destCluster, 50);
        addSourceAcl("test.topic.1");
        addSourceAcl("test.topic.2");
        waitForAcls(this.destCluster, 9 + 2, linkId);
        waitForAcls(this.destCluster, 4 + 9, Uuid.ZERO_UUID);
        setAclLimit(this.destCluster, 13);
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return aclCount(this.destCluster, linkId) == 0;
        }, "Link id not removed on cluster link delete");
        waitForAcls(this.destCluster, 13 + 2, Uuid.ZERO_UUID);
    }

    private void setAclLimit(MultiTenantCluster multiTenantCluster, int i) {
        Iterator<KafkaBroker> it = multiTenantCluster.physicalCluster.kafkaCluster().kafkaBrokers().iterator();
        while (it.hasNext()) {
            ((AuthorizerConfig) ((Authorizer) it.next().authorizer().get()).config().get()).setDefaultMaxAcls(Integer.valueOf(i));
        }
        if (multiTenantCluster.isKraft()) {
            ((AuthorizerConfig) ((Authorizer) multiTenantCluster.physicalCluster.kafkaCluster().kraftController().authorizer().get()).config().get()).setDefaultMaxAcls(Integer.valueOf(i));
        }
    }

    private Properties brokerProps() {
        return brokerProps(true);
    }

    private Properties brokerProps(boolean z) {
        Properties properties = new Properties();
        properties.put("confluent.cluster.link.enable", "true");
        properties.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put("confluent.plugins.topic.policy.replication.factor", "1");
        properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        properties.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        properties.put(KafkaConfig.PasswordEncoderSecretProp(), "multi-tenant-cluster-link-secret");
        properties.put(KafkaConfig.CreateTopicPolicyClassNameProp(), CreateTopicPolicy.class.getName());
        properties.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        properties.put(KafkaConfig.CreateClusterLinkPolicyClassNameProp(), CreateClusterLinkPolicy.class.getName());
        if (TestInfoUtils.isKRaft(this.testInfo)) {
            properties.put("confluent.cluster.link.metadata.topic.enable", "true");
            properties.put("confluent.cluster.link.metadata.topic.replication.factor", "1");
            properties.put("confluent.cluster.link.metadata.topic.partitions", "3");
        }
        if (z) {
            properties.put("listener.name.internal.broker.interceptor.class", ConfluentCloudBrokerInterceptor.class.getName());
        }
        if (testRunsWithLinkCoordinator()) {
            properties.put("confluent.cluster.link.metadata.topic.enable", "true");
            properties.put("confluent.cluster.link.metadata.topic.partitions", "1");
            properties.put("confluent.cluster.link.metadata.topic.replication.factor", "1");
            properties.put("confluent.cluster.link.metadata.topic.min.isr", "1");
        }
        return properties;
    }

    private void createSourceTopic() {
        createTopic(this.sourceCluster);
    }

    private void createTopic(MultiTenantCluster multiTenantCluster) {
        multiTenantCluster.physicalCluster.kafkaCluster().createTopic(multiTenantCluster.user.tenantPrefix() + "linkedTopic", this.numPartitions, 1);
    }

    private void createMirrorTopicWaitForSuccess(ConfluentAdmin confluentAdmin) throws Exception {
        createMirrorTopicWaitForSuccess(confluentAdmin, "", new NewMirrorTopic("tenantLink", "linkedTopic"));
    }

    private void createMirrorTopicWaitForSuccess(ConfluentAdmin confluentAdmin, String str) throws Exception {
        createMirrorTopicWaitForSuccess(confluentAdmin, str, new NewMirrorTopic("tenantLink", "linkedTopic"));
    }

    private void createMirrorTopicWaitForSuccess(ConfluentAdmin confluentAdmin, String str, NewMirrorTopic newMirrorTopic) throws Exception {
        NewTopic mirror = new NewTopic(str + "linkedTopic", Optional.empty(), Optional.of((short) 1)).mirror(Optional.of(new NewMirrorTopic("tenantLink", "linkedTopic")));
        CreateTopicsOptions timeoutMs = new CreateTopicsOptions().timeoutMs(5000);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            try {
                confluentAdmin.createTopics(Collections.singleton(mirror), timeoutMs).all().get();
                return true;
            } catch (Throwable th) {
                log.error("Failed to create mirror topic {}", mirror, th);
                return false;
            }
        }, "Failed to create mirror");
    }

    private void createMirrorTopicWaitForFailure(ConfluentAdmin confluentAdmin) {
        NewTopic mirror = new NewTopic("linkedTopic", Optional.empty(), Optional.of((short) 1)).mirror(Optional.of(new NewMirrorTopic("tenantLink", "linkedTopic")));
        org.apache.kafka.test.TestUtils.assertFutureThrows(confluentAdmin.createTopics(Collections.singleton(mirror), new CreateTopicsOptions().timeoutMs(3000)).all(), TimeoutException.class);
    }

    private void createMirrorTopic(ConfluentAdmin confluentAdmin, String str) throws Exception {
        createMirrorTopic(confluentAdmin, str, new NewMirrorTopic("tenantLink", "linkedTopic"));
    }

    private void createMirrorTopic(ConfluentAdmin confluentAdmin, String str, NewMirrorTopic newMirrorTopic) throws Exception {
        confluentAdmin.createTopics(Collections.singleton(new NewTopic(str + "linkedTopic", Optional.empty(), Optional.of((short) 1)).mirror(Optional.of(new NewMirrorTopic("tenantLink", "linkedTopic")))), new CreateTopicsOptions().timeoutMs(5000)).all().get();
    }

    private void stopMirroring(MultiTenantCluster multiTenantCluster, String str) throws Throwable {
        multiTenantCluster.admin.alterMirrors(Collections.singletonMap(str, AlterMirrorOp.FAILOVER), new AlterMirrorsOptions()).all().get(15L, TimeUnit.SECONDS);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return multiTenantCluster.mirrorDescription(str, true).state() == MirrorTopicDescription.State.STOPPED;
        }, "Mirror not stopped");
    }

    private void reverseAndSwap(MultiTenantCluster multiTenantCluster, String str, boolean z) throws Throwable {
        multiTenantCluster.admin.alterMirrors(Collections.singletonMap(str, z ? AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR : AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR), new AlterMirrorsOptions()).all().get(15L, TimeUnit.SECONDS);
    }

    private void verifyTopicListing() throws Exception {
        verifyTopicListing("", this.destCluster);
    }

    private void verifyTopicListing(String str, MultiTenantCluster multiTenantCluster) throws Exception {
        org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(() -> {
            Collection collection = (Collection) multiTenantCluster.admin.listClusterLinks(new ListClusterLinksOptions().includeTopics(true)).result().get();
            Assertions.assertEquals(1, collection.size());
            ClusterLinkListing clusterLinkListing = (ClusterLinkListing) collection.iterator().next();
            Assertions.assertEquals("tenantLink", clusterLinkListing.linkName());
            Assertions.assertTrue(clusterLinkListing.topics().isPresent());
            Collection collection2 = (Collection) clusterLinkListing.topics().get();
            Assertions.assertEquals(1, collection2.size());
            Assertions.assertEquals(str + "linkedTopic", collection2.iterator().next());
        });
    }

    private void verifyTopicMirroring(Optional<String> optional) throws Throwable {
        verifyTopicMirroring("", this.sourceCluster, this.destCluster, optional);
    }

    private void verifyTopicMirroring() throws Throwable {
        verifyTopicMirroring("", this.sourceCluster, this.destCluster, Optional.empty());
    }

    private void verifyTopicMirroring(String str, MultiTenantCluster multiTenantCluster, MultiTenantCluster multiTenantCluster2) throws Throwable {
        verifyTopicMirroring(str, multiTenantCluster, multiTenantCluster2, Optional.empty());
    }

    private void verifyTopicMirroring(String str, MultiTenantCluster multiTenantCluster, MultiTenantCluster multiTenantCluster2, Optional<String> optional) throws Throwable {
        int i = this.nextMessageIndex;
        this.nextMessageIndex += 10;
        KafkaTestUtils.sendRecords(multiTenantCluster.getOrCreateProducer(optional), "linkedTopic", i, 10);
        KafkaTestUtils.consumeRecords(multiTenantCluster2.getOrCreateConsumer("destGroup", optional), str + "linkedTopic", i, 10);
    }

    private void verifyReplicaStatus(String str, MultiTenantCluster multiTenantCluster, MultiTenantCluster multiTenantCluster2) throws Throwable {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < this.numPartitions; i++) {
            hashSet.add(new TopicPartition("linkedTopic", i));
        }
        ReplicaStatusOptions includeLinkedReplicas = new ReplicaStatusOptions().includeLinkedReplicas(true);
        Map map = (Map) multiTenantCluster.admin.replicaStatus(hashSet, includeLinkedReplicas).allResults().get(15L, TimeUnit.SECONDS);
        Map map2 = (Map) multiTenantCluster2.admin.replicaStatus((Set) hashSet.stream().map(topicPartition -> {
            return new TopicPartition(str + topicPartition.topic(), topicPartition.partition());
        }).collect(Collectors.toSet()), includeLinkedReplicas).allResults().get(15L, TimeUnit.SECONDS);
        for (TopicPartition topicPartition2 : map.keySet()) {
            PartitionResult partitionResult = (PartitionResult) map.get(topicPartition2);
            PartitionResult partitionResult2 = (PartitionResult) map2.get(new TopicPartition(str + topicPartition2.topic(), topicPartition2.partition()));
            Assertions.assertNotNull(partitionResult);
            Assertions.assertNotNull(partitionResult2);
            Assertions.assertTrue(partitionResult.leaderEpoch().getAsInt() <= partitionResult2.leaderEpoch().getAsInt());
            ReplicaStatus replicaStatus = (ReplicaStatus) partitionResult.replicas().stream().filter((v0) -> {
                return v0.isLeader();
            }).findAny().get();
            ReplicaStatus replicaStatus2 = (ReplicaStatus) partitionResult2.replicas().stream().filter((v0) -> {
                return v0.isLeader();
            }).findAny().get();
            Assertions.assertEquals(replicaStatus.logStartOffset(), replicaStatus2.logStartOffset());
            Assertions.assertEquals(replicaStatus.logEndOffset(), replicaStatus2.logEndOffset());
            for (ReplicaStatus replicaStatus3 : partitionResult.replicas()) {
                Assertions.assertEquals(Optional.empty(), replicaStatus3.linkName());
                Assertions.assertEquals(Optional.empty(), replicaStatus3.mirrorInfo());
            }
            for (ReplicaStatus replicaStatus4 : partitionResult2.replicas()) {
                if (replicaStatus4.linkName().isPresent()) {
                    Assertions.assertEquals("tenantLink", replicaStatus4.linkName().get());
                    Assertions.assertEquals(Optional.empty(), replicaStatus4.mirrorInfo());
                } else if (replicaStatus4.isLeader()) {
                    Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, ((ReplicaStatus.MirrorInfo) replicaStatus4.mirrorInfo().get()).state());
                } else {
                    Assertions.assertEquals(Optional.empty(), replicaStatus4.mirrorInfo());
                }
            }
        }
    }

    private void verifyAclAndOffsetMigration(String str) throws Throwable {
        Set<AclBinding> addAcls = addAcls(this.sourceCluster.admin, this.sourceCluster.user, new String[0]);
        addBrokerAclsForOffsetMigration(str);
        String str2 = "linkedGroup";
        Map<TopicPartition, OffsetAndMetadata> commitOffsets = commitOffsets(this.sourceCluster.admin, "linkedGroup");
        if (!str.isEmpty()) {
            commitOffsets = (Map) commitOffsets.entrySet().stream().collect(Collectors.toMap(entry -> {
                return new TopicPartition(str + ((TopicPartition) entry.getKey()).topic(), ((TopicPartition) entry.getKey()).partition());
            }, entry2 -> {
                return (OffsetAndMetadata) entry2.getValue();
            }));
        }
        waitFor(() -> {
            return this.destCluster.describeAcls(this.sourceCluster.user);
        }, (Set) addAcls.stream().map(aclBinding -> {
            return SecurityUtils.aclWithClusterLinkIds(aclBinding, Collections.singleton(this.destCluster.linkId("tenantLink")));
        }).collect(Collectors.toSet()), "Acls not migrated");
        waitFor(() -> {
            return this.destCluster.committedOffsets(str2);
        }, commitOffsets, "Consumer offsets not migrated");
    }

    private Map<String, String> linkMetricTags(String str, String str2, boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, str);
        hashMap.put("link-name", z ? "_confluent" : "tenantLink");
        hashMap.put(TenantMetricsTestUtils.REQUEST_TAG, str2);
        return hashMap;
    }

    private Map<String, String> linkIdMetricTags(Uuid uuid, String str, String str2, boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, str);
        hashMap.put("link-id", z ? ClusterLinkMetricsUtils.METRIC_REDUCTION_LINK_ID_TAG_VALUE : uuid.toString());
        hashMap.put(TenantMetricsTestUtils.REQUEST_TAG, str2);
        return hashMap;
    }

    private void verifyDestinationDeadThreadCount(String str, boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put("clientId", "ClusterLink");
        hashMap.put("link-name", z ? "_confluent" : this.destCluster.user.tenantPrefix() + str);
        Assertions.assertTrue(!yammerMetrics(new KafkaMetricsGroup(ClusterLinkFetcherManager.class).metricName("DeadThreadCount", hashMap).getMBeanName()).isEmpty(), "DeadThreadCount should exist");
    }

    private void verifyTenantMetrics(Uuid uuid, int i, String str, boolean z, boolean z2) {
        HashMap hashMap = new HashMap();
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, "lkc-source");
        hashMap.put(MultiTenantRequestContextTest.USERNAME, String.valueOf(i));
        hashMap.put(TenantMetricsTestUtils.REQUEST_TAG, "Fetch");
        Assertions.assertFalse(metricsFound(this.sourceCluster, "tenant-metrics", hashMap));
        Map<String, String> linkMetricTags = linkMetricTags("lkc-dest", "Fetch", z2);
        double metricValue = metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", TenantMetricsTestUtils.RESPONSE_TIME_NS_MAX, linkMetricTags);
        Assertions.assertTrue(metricValue > 0.0d && metricValue < ((double) TimeUnit.SECONDS.toNanos(15L)), "Invalid response time metric: " + metricValue);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(MultiTenantRequestContextTest.TENANT_NAME, "lkc-dest");
        hashMap2.put("link-name", "tenantLink");
        HashMap hashMap3 = new HashMap(hashMap2);
        hashMap3.put("deployed-link-type", "hybrid");
        if (z) {
            hashMap3.put("connection-mode", "inbound");
        } else {
            hashMap3.put("connection-mode", "outbound");
        }
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(metricValue(this.destCluster, "cluster-link-metrics", "link-count", hashMap3, !z2) > 0.0d);
        }, () -> {
            return "ACTIVE state link-count not populated";
        }, TimeUnit.SECONDS.toMillis(60L), 100L);
        Assertions.assertEquals(1.0d, metricValue(this.destCluster, "cluster-link-metrics", "link-count", hashMap3, !z2), 0.001d);
        hashMap2.put("state", "Mirror");
        Assertions.assertEquals(1.0d, metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", hashMap2, !z2), 0.001d);
        hashMap2.put("state", "FailedMirror");
        Assertions.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", hashMap2, !z2), 0.001d);
        hashMap2.put("state", "PausedMirror");
        Assertions.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", hashMap2, !z2), 0.001d);
        hashMap2.put("state", "PendingStoppedMirror");
        Assertions.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", hashMap2, !z2), 0.001d);
        hashMap2.put("state", "StoppedMirror");
        Assertions.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", hashMap2, !z2), 0.001d);
        hashMap2.remove("state");
        Assertions.assertEquals(this.numPartitions, metricValue(this.destCluster, "cluster-link-metrics", "mirror-partition-count", hashMap2, !z2), 0.001d);
        hashMap2.put("topic", str + "linkedTopic");
        Assertions.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-lag", hashMap2, !z2), 0.0d);
        Assertions.assertTrue(metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-byte-total", hashMap2, !z2) > 0.0d, "Invalid mirror topic throughput");
        hashMap2.remove("topic");
        Map<String, String> linkIdMetricTags = linkIdMetricTags(uuid, "lkc-source", "Fetch", z2);
        assertRange("requests", metricValue(this.sourceCluster, "cluster-link-source-metrics", TenantMetricsTestUtils.REQUEST_TOTAL, linkIdMetricTags), metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", TenantMetricsTestUtils.REQUEST_TOTAL, linkMetricTags), 10.0d);
        assertRange("request-bytes", metricValue(this.sourceCluster, "cluster-link-source-metrics", TenantMetricsTestUtils.REQUEST_BYTE_TOTAL, linkIdMetricTags), metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", TenantMetricsTestUtils.REQUEST_BYTE_TOTAL, linkMetricTags), 2000.0d);
        assertRange("response-bytes", metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", TenantMetricsTestUtils.RESPONSE_BYTE_TOTAL, linkMetricTags), metricValue(this.sourceCluster, "cluster-link-source-metrics", TenantMetricsTestUtils.RESPONSE_BYTE_TOTAL, linkIdMetricTags), 2000.0d);
        double metricValue2 = metricValue(this.sourceCluster, "cluster-link-source-metrics", TenantMetricsTestUtils.RESPONSE_TIME_NS_MAX, linkIdMetricTags);
        Assertions.assertTrue(metricValue2 > 0.0d && metricValue2 < ((double) TimeUnit.SECONDS.toNanos(15L)), "Invalid source link response time metric: " + metricValue2);
    }

    private void verifyTenantConnectionMetrics(boolean z, boolean z2) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, "lkc-source");
        hashMap.put(MultiTenantRequestContextTest.USERNAME, String.valueOf(this.sourceCluster.linkUser.userMetadata.userId()));
        Supplier supplier = () -> {
            return Double.valueOf(metricValue(this.sourceCluster, "tenant-metrics", TenantMetricsTestUtils.ACTIVE_AUTH_CONNECTIONS_COUNT_METRIC_NAME, hashMap));
        };
        int i = z2 ? 2 : 0;
        int i2 = z2 ? 10 : 0;
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return ((Double) supplier.get()).doubleValue() >= ((double) i) && ((Double) supplier.get()).doubleValue() <= ((double) i2);
        }, () -> {
            return "Source connection metric not updated for link, value = " + supplier.get();
        });
        if (z) {
            HashMap hashMap2 = new HashMap();
            hashMap2.put(MultiTenantRequestContextTest.TENANT_NAME, "lkc-dest");
            hashMap2.put(MultiTenantRequestContextTest.USERNAME, String.valueOf(this.destCluster.linkUser.userMetadata.userId()));
            Supplier supplier2 = () -> {
                return Double.valueOf(metricValue(this.destCluster, "tenant-metrics", TenantMetricsTestUtils.ACTIVE_AUTH_CONNECTIONS_COUNT_METRIC_NAME, hashMap2));
            };
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return ((Double) supplier2.get()).doubleValue() >= ((double) i) && ((Double) supplier2.get()).doubleValue() <= ((double) i2);
            }, () -> {
                return "Destination connection metric not updated for reverse connections, value = " + supplier2.get();
            });
        }
    }

    private void verifyMetricsGroups(Uuid uuid, boolean z) {
        Map<String, String> linkMetricTags = linkMetricTags("lkc-dest", "Metadata", z);
        Map<String, String> linkMetricTags2 = linkMetricTags("lkc-source", "Metadata", z);
        Assertions.assertEquals(0.0d, metricValue(this.sourceCluster, "cluster-link-dest-tenant-metrics", TenantMetricsTestUtils.REQUEST_TOTAL, linkMetricTags2, false), 0.001d);
        Assertions.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-source-tenant-metrics", TenantMetricsTestUtils.REQUEST_TOTAL, linkMetricTags, false), 0.001d);
        double metricValue = metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", TenantMetricsTestUtils.REQUEST_TOTAL, linkMetricTags);
        Assertions.assertTrue(metricValue > 0.0d, "Dest metric not updated: " + metricValue);
        if (this.sourceCluster.useSourceInitiatedLink) {
            double metricValue2 = metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", TenantMetricsTestUtils.REQUEST_TOTAL, linkMetricTags2);
            Assertions.assertTrue(metricValue2 > 0.0d, "Source metric not updated: " + metricValue2);
        } else {
            Assertions.assertEquals(0.0d, metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", TenantMetricsTestUtils.REQUEST_TOTAL, linkMetricTags2, false), 0.001d);
        }
        Map<String, String> linkIdMetricTags = linkIdMetricTags(uuid, "lkc-dest", "Metadata", z);
        double metricValue3 = metricValue(this.sourceCluster, "cluster-link-source-metrics", TenantMetricsTestUtils.REQUEST_TOTAL, linkIdMetricTags(uuid, "lkc-source", "Metadata", z));
        Assertions.assertTrue(metricValue3 > 0.0d, "Source metric not updated: " + metricValue3);
        double metricValue4 = metricValue(this.destCluster, "cluster-link-source-metrics", TenantMetricsTestUtils.REQUEST_TOTAL, linkIdMetricTags, false);
        if (this.destCluster.useSourceInitiatedLink) {
            Assertions.assertTrue(metricValue4 > 0.0d, "Destination incoming request metric not updated: " + metricValue);
        } else {
            Assertions.assertEquals(0.0d, metricValue4, 0.001d);
        }
        HashMap hashMap = new HashMap();
        hashMap.put("link-id", Utils.toJavaUuid(uuid).toString());
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, "lkc-source");
        hashMap.put("mode", "source");
        Assertions.assertEquals(this.sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().size(), (int) totalMetricValue(this.sourceCluster, "cluster-link-metrics", "active-link-count", hashMap));
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, "lkc-dest");
        hashMap.put("mode", "destination");
        Assertions.assertEquals(this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers().size(), (int) totalMetricValue(this.destCluster, "cluster-link-metrics", "active-link-count", hashMap));
    }

    private void verifySocketBufferSizeUpdate() throws Exception {
        long j = 2097152;
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        Assertions.assertEquals(PolicyViolationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(131072L)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return this.destCluster.linkConfig("tenantLink", KafkaConfig.ReplicaSocketReceiveBufferBytesProp());
        }, String.valueOf(131072L), "Link config not updated");
    }

    private void assertRange(String str, double d, double d2, double d3) {
        Assertions.assertTrue(d2 - d <= d3 && d - d2 < 0.001d, String.format("Metric values for '%s' (%f, %f) not within expected range %f", str, Double.valueOf(d), Double.valueOf(d2), Double.valueOf(d3)));
    }

    private boolean metricsFound(MultiTenantCluster multiTenantCluster, String str, Map<String, String> map) {
        Iterator<KafkaBroker> it = multiTenantCluster.physicalCluster.kafkaCluster().kafkaBrokers().iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().metrics().metrics().entrySet().iterator();
            while (it2.hasNext()) {
                MetricName metricName = (MetricName) ((Map.Entry) it2.next()).getKey();
                if (metricName.group().equals(str) && tagsMatched(metricName, map)) {
                    return true;
                }
            }
        }
        return false;
    }

    private double metricValue(MultiTenantCluster multiTenantCluster, String str, String str2, Map<String, String> map) {
        return metricValue(multiTenantCluster, str, str2, map, true);
    }

    private double metricValue(MultiTenantCluster multiTenantCluster, String str, String str2, Map<String, String> map, boolean z) {
        double d = 0.0d;
        boolean z2 = false;
        Iterator<KafkaBroker> it = multiTenantCluster.physicalCluster.kafkaCluster().kafkaBrokers().iterator();
        while (it.hasNext()) {
            for (Map.Entry entry : it.next().metrics().metrics().entrySet()) {
                if (isMatchingMetric((MetricName) entry.getKey(), str2, str, map)) {
                    z2 = true;
                    d += ((Double) ((KafkaMetric) entry.getValue()).metricValue()).doubleValue();
                }
            }
        }
        if (z) {
            Assertions.assertTrue(z2, "Metric not found " + str2);
        }
        return d;
    }

    private double totalMetricValue(MultiTenantCluster multiTenantCluster, String str, String str2, Map<String, String> map) {
        double d = 0.0d;
        Iterator<KafkaBroker> it = multiTenantCluster.physicalCluster.kafkaCluster().kafkaBrokers().iterator();
        while (it.hasNext()) {
            for (Map.Entry entry : it.next().metrics().metrics().entrySet()) {
                if (isMatchingMetric((MetricName) entry.getKey(), str2, str, map)) {
                    d += ((Double) ((KafkaMetric) entry.getValue()).metricValue()).doubleValue();
                }
            }
        }
        return d;
    }

    private boolean isMatchingMetric(MetricName metricName, String str, String str2, Map<String, String> map) {
        if (metricName.name().equals(str) && metricName.group().equals(str2)) {
            return tagsMatched(metricName, map);
        }
        return false;
    }

    private boolean tagsMatched(MetricName metricName, Map<String, String> map) {
        Map tags = metricName.tags();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (!entry.getValue().equals(tags.get(entry.getKey()))) {
                return false;
            }
        }
        return true;
    }

    private void addSourcePartitionsAndVerifyMirror(int i, String str) throws Exception {
        ((KafkaFuture) this.sourceCluster.admin.createPartitions(Collections.singletonMap("linkedTopic", NewPartitions.increaseTo(i))).values().get("linkedTopic")).get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return Integer.valueOf(this.destCluster.partitionsForTopic(str + "linkedTopic"));
        }, Integer.valueOf(i), "Topic partitions not updated");
        this.numPartitions = i;
    }

    private void changeSourceTopicConfigAndVerifyMirror(String str) throws Exception {
        this.sourceCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "linkedTopic"), Collections.singleton(new AlterConfigOp(new ConfigEntry("max.message.bytes", "123456"), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return this.destCluster.topicConfig(str + "linkedTopic", "max.message.bytes");
        }, "123456", "Topic configs not migrated");
    }

    private void verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode linkMode, String str, int i, ClusterLinkDescription.LinkMode linkMode2, int i2) throws Throwable {
        CreateClusterLinksResult createSourceClusterLinkResult;
        Map<String, String> singletonMap = Collections.singletonMap(ClusterLinkConfig.LinkModeProp(), linkMode.name());
        switch (AnonymousClass3.$SwitchMap$org$apache$kafka$clients$admin$ClusterLinkDescription$LinkMode[linkMode2.ordinal()]) {
            case 1:
                createSourceClusterLinkResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, str, this.sourceCluster, i, "EXTERNAL", singletonMap, true);
                break;
            case 2:
                createSourceClusterLinkResult = this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, str, this.destCluster, i, singletonMap);
                break;
            default:
                throw new IllegalArgumentException("Unexpected policy violation mode " + linkMode2);
        }
        String format = String.format("Cluster link with link mode %s could not be created because this cluster already has the maximum number of%s cluster links (%d). You can request a higher limit through Confluent Support.", linkMode.name(), linkMode2.name(), Integer.valueOf(i2));
        if (!TestInfoUtils.isKRaft(this.testInfo)) {
            format = "Unable to validate cluster link due to error: " + format;
        }
        org.apache.kafka.test.TestUtils.assertFutureThrows(createSourceClusterLinkResult.all(), PolicyViolationException.class, format);
    }

    private void changeMaxClusterLinks(int i, int i2) throws Exception {
        if (i >= 0) {
            this.destCluster.internalAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.cluster.link.policy.max.destination.links.per.tenant", String.valueOf(i)), AlterConfigOp.OpType.SET)))).all().get();
        }
        if (i2 >= 0) {
            this.sourceCluster.internalAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.cluster.link.policy.max.source.links.per.tenant", String.valueOf(i2)), AlterConfigOp.OpType.SET)))).all().get();
        }
    }

    private Set<AclBinding> addAcls(Admin admin, LogicalClusterUser logicalClusterUser, String... strArr) throws Exception {
        String kafkaPrincipal = logicalClusterUser.unprefixedKafkaPrincipal().toString();
        Set<AclBinding> mkSet = Utils.mkSet(new AclBinding[]{new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALL, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TOPIC, "src_linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALL, AclPermissionType.ALLOW))});
        for (String str : strArr) {
            mkSet.add(new AclBinding(new ResourcePattern(ResourceType.TOPIC, str, PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALL, AclPermissionType.ALLOW)));
        }
        admin.createAcls(mkSet).all().get(15L, TimeUnit.SECONDS);
        return mkSet;
    }

    private void addSourceAcl(String str) throws Exception {
        addAcl(this.sourceCluster, str);
    }

    private void addAcl(MultiTenantCluster multiTenantCluster, String str) throws Exception {
        multiTenantCluster.admin.createAcls(Collections.singleton(new AclBinding(new ResourcePattern(ResourceType.TOPIC, str, PatternType.PREFIXED), new AccessControlEntry(multiTenantCluster.user.unprefixedKafkaPrincipal().toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)))).all().get(15L, TimeUnit.SECONDS);
    }

    private void waitForAcls(MultiTenantCluster multiTenantCluster, int i, Uuid uuid) throws Exception {
        String str = multiTenantCluster == this.sourceCluster ? "Source" : "Destination";
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return aclCount(multiTenantCluster, uuid) == i;
        }, () -> {
            return String.format("%s acls not created, expected %d got %d", str, Integer.valueOf(i), Integer.valueOf(aclCount(multiTenantCluster, uuid)));
        });
    }

    private int aclCount(MultiTenantCluster multiTenantCluster, Uuid uuid) {
        try {
            AclBindingFilter aclBindingFilter = AclBindingFilter.ANY;
            if (uuid != null) {
                aclBindingFilter = SecurityUtils.aclFilterWithClusterLinkIds(aclBindingFilter, Collections.singleton(uuid));
            }
            return ((Collection) multiTenantCluster.admin.describeAcls(aclBindingFilter).values().get(15L, TimeUnit.SECONDS)).size();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private double aclAddFailedMetric() {
        return totalMetricValue(this.destCluster, "cluster-link-metrics", "acls-add-failed-total", Collections.emptyMap());
    }

    private void waitForBackgroundThreadUsage(MultiTenantCluster multiTenantCluster, int i, int i2) throws Exception {
        Map singletonMap = Collections.singletonMap("thread", String.valueOf(i));
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return totalMetricValue(multiTenantCluster, "cluster-link-metrics", "background-thread-usage", singletonMap) == ((double) i2);
        }, () -> {
            return "Unexpected thread usage for background thread " + i + ": expected " + i2 + ", got " + totalMetricValue(multiTenantCluster, "cluster-link-metrics", "background-thread-usage", singletonMap);
        });
    }

    private void waitForBackgroundThreadTenants(MultiTenantCluster multiTenantCluster, int i, int i2) throws Exception {
        Map singletonMap = Collections.singletonMap("thread", String.valueOf(i));
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return totalMetricValue(multiTenantCluster, "cluster-link-metrics", "background-thread-tenants", singletonMap) == ((double) i2);
        }, () -> {
            return "Unexpected tenants for background thread " + i + ": expected " + i2 + ", got " + totalMetricValue(multiTenantCluster, "cluster-link-metrics", "background-thread-tenants", singletonMap);
        });
    }

    private void addBrokerAclsForOffsetMigration(String str) {
        String str2 = this.destCluster.user.tenantPrefix() + str + "linked";
        this.destCluster.physicalCluster.newAclCommand().consumeAclArgs(PhysicalCluster.BROKER_PRINCIPAL, str2, str2, PatternType.PREFIXED).execute();
    }

    private Map<TopicPartition, OffsetAndMetadata> commitOffsets(Admin admin, String str) throws Exception {
        HashMap hashMap = new HashMap();
        LogManager logManager = this.sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().get(0).logManager();
        for (int i = 0; i < this.numPartitions; i++) {
            hashMap.put(new TopicPartition("linkedTopic", i), new OffsetAndMetadata(((AbstractLog) logManager.getLog(new TopicPartition(this.sourceCluster.user.tenantPrefix() + "linkedTopic", i), false).get()).localLogEndOffset()));
        }
        admin.alterConsumerGroupOffsets(str, hashMap).all().get();
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void waitFor(Supplier<T> supplier, T t, String str) throws Exception {
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return t.equals(supplier.get());
        }, () -> {
            return str + " : expected=" + t + ", actual=" + supplier.get();
        });
    }

    private String aclFilters(KafkaPrincipal... kafkaPrincipalArr) {
        StringBuilder sb = new StringBuilder();
        sb.append("{ \"aclFilters\": [");
        for (int i = 0; i < kafkaPrincipalArr.length; i++) {
            if (i != 0) {
                sb.append(",");
            }
            sb.append("{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" }, ");
            sb.append("\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\", ");
            sb.append(String.format("\"principal\": \"%s\"", kafkaPrincipalArr[i]));
            sb.append("}}");
        }
        sb.append("]}");
        return sb.toString();
    }

    private Set<AclBinding> aclBindings(Collection<Uuid> collection, AclBinding... aclBindingArr) {
        return (Set) Stream.of((Object[]) aclBindingArr).map(aclBinding -> {
            return SecurityUtils.aclWithClusterLinkIds(aclBinding, collection);
        }).collect(Collectors.toSet());
    }

    private void waitForDestAcls(Collection<AclBinding> collection) throws Exception {
        waitFor(() -> {
            return currentAcls(this.destCluster.admin);
        }, new HashSet(collection), "ACLs sync result failed");
    }

    private Set<AclBinding> currentAcls(ConfluentAdmin confluentAdmin) {
        try {
            return new HashSet((Collection) confluentAdmin.describeAcls(AclBindingFilter.ANY).values().get());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void deleteAcls(ConfluentAdmin confluentAdmin, AclBinding... aclBindingArr) throws Exception {
        confluentAdmin.deleteAcls((Collection) aclBindings(Collections.emptySet(), aclBindingArr).stream().map((v0) -> {
            return v0.toFilter();
        }).collect(Collectors.toSet())).all().get(15L, TimeUnit.SECONDS);
    }

    private void verifyMismatchedLinkIdFailure(Map<String, String> map) throws Exception {
        Uuid randomUuid = Uuid.randomUuid();
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        });
        Assertions.assertEquals(InvalidConfigurationException.class, executionException.getCause().getClass());
        Assertions.assertTrue(executionException.getMessage().contains("does not match requested link id"), executionException.getMessage());
    }
}
