package io.confluent.kafka.link.integration;

import com.google.common.collect.Lists;
import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.integration.test.SaslAuthenticateRequestCallback;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.auth.DefaultDataPolicyContext;
import io.confluent.kafka.server.plugins.auth.TestLogicalClusterMetadata;
import io.confluent.kafka.server.plugins.auth.TestPhysicalClusterMetadata;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.CreateClusterLinkPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkManager;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateClusterLinksOptions;
import org.apache.kafka.clients.admin.CreateClusterLinksResult;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DescribeClusterLinksOptions;
import org.apache.kafka.clients.admin.ListClusterLinksOptions;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.clients.admin.NewMirrorTopic;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.SaslInternalConfigs;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.datapolicy.DefaultDataPolicyStore;
import org.apache.kafka.server.interceptor.ConfluentCloudBrokerInterceptor;
import org.apache.kafka.test.TestSslUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tags({@Tag("integration"), @Tag("bazel:shard_count:6"), @Tag("bazel:size:large")})
/* loaded from: input_file:io/confluent/kafka/link/integration/MultiTenantCLDefaultDataPolicyTest.class */
public class MultiTenantCLDefaultDataPolicyTest {
    public static final String SSL_KAFKA_CN = "kafka";
    private static final String TEST_ORGANIZATION_ID = "org-test";
    private static final String TEST_ENVIRONMENT_ID = "env-test";
    private static final Logger log = LoggerFactory.getLogger(MultiTenantCLDefaultDataPolicyTest.class);
    private MultiTenantCluster sourceCluster;
    private MultiTenantCluster destCluster;
    private final String sourceLogicalCluster = "lkc-source";
    private final String destLogicalCluster = "lkc-dest";
    private String destClusterOrganizationId = TEST_ORGANIZATION_ID;
    private final String linkName = "tenantLink";
    private final String topic = "linkedTopic";
    private int numPartitions = 2;
    private int nextMessageIndex = 0;
    private final Map<String, String> sslConfigs = new HashMap();
    private final Map<String, String> clientSslConfigs = new HashMap();

    /* loaded from: input_file:io/confluent/kafka/link/integration/MultiTenantCLDefaultDataPolicyTest$MultiTenantCluster.class */
    public static class MultiTenantCluster extends IntegrationTestHarness {
        private final Map<String, String> sslConfigs;
        private final Map<String, String> clientSslConfigs;
        private PhysicalCluster physicalCluster;
        private LogicalCluster logicalCluster;
        private LogicalClusterUser user;
        private LogicalClusterUser linkUser;
        private ConfluentAdmin admin;
        private ConfluentAdmin internalListenerAdmin;
        private TestPhysicalClusterMetadata physicalClusterMetadata;
        KafkaProducer<String, String> producerToInternalListener;
        KafkaConsumer<String, String> consumerToExternalListener;
        String organizationId;

        public MultiTenantCluster(TestInfo testInfo, Map<String, String> map, Map<String, String> map2) {
            super(testInfo);
            this.sslConfigs = map;
            this.clientSslConfigs = map2;
        }

        void startCluster(Map<String, String> map, String str, int i, String str2) {
            this.organizationId = str2;
            Properties brokerProps = brokerProps();
            brokerProps.putAll(this.sslConfigs);
            brokerProps.putAll(map);
            brokerProps.setProperty("listener.security.protocol.map", "INTERNAL:PLAINTEXT,EXTERNAL:SASL_SSL");
            Properties brokerProps2 = brokerProps();
            brokerProps2.putAll(map);
            try {
                this.physicalCluster = startWithTopic("_confluent-logical_clusters", 1, 1, 60000L, brokerProps, brokerProps2, Optional.of(Time.SYSTEM));
                this.logicalCluster = this.physicalCluster.createLogicalCluster(str, 100, Integer.valueOf(i));
                this.user = this.logicalCluster.user(i);
                seedLogicalClusterMetadata();
                createAdmins();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        private void seedLogicalClusterMetadata() {
            TestLogicalClusterMetadata testLogicalClusterMetadata = new TestLogicalClusterMetadata(this.logicalCluster.logicalClusterId(), this.organizationId, MultiTenantCLDefaultDataPolicyTest.TEST_ENVIRONMENT_ID);
            this.physicalCluster.brokerSessionUuids().forEach(str -> {
                this.physicalClusterMetadata = (TestPhysicalClusterMetadata) TestPhysicalClusterMetadata.getInstance(str);
                this.physicalClusterMetadata.addLogicalCluster(testLogicalClusterMetadata);
            });
        }

        private void createAdmins() {
            Properties properties = new Properties();
            properties.putAll(this.clientSslConfigs);
            this.admin = super.createAdminClient(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_SSL, "PLAIN", this.logicalCluster.adminUser().testPlainSaslJaasConfig(), properties, new SaslAuthenticateRequestCallback(new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, true).build(), "lkc-inconsequential"), null);
            this.internalListenerAdmin = super.createAdminClient(this.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.PLAINTEXT, null, null, new Properties());
        }

        private Properties brokerProps() {
            Properties properties = new Properties();
            properties.put("confluent.cluster.link.enable", "true");
            properties.put("authorizer.class.name", MultiTenantAuthorizer.class.getName());
            properties.put("confluent.plugins.topic.policy.replication.factor", "1");
            properties.put("auto.create.topics.enable", "false");
            properties.put("client.quota.callback.class", TenantQuotaCallback.class.getName());
            properties.put("confluent.cdc.lkc.metadata.topic", "_confluent-logical_clusters");
            properties.put("create.topic.policy.class.name", CreateTopicPolicy.class.getName());
            properties.put("alter.config.policy.class.name", AlterConfigPolicy.class.getName());
            properties.put("create.cluster.link.policy.class.name", CreateClusterLinkPolicy.class.getName());
            properties.put("listener.name.internal.broker.interceptor.class", ConfluentCloudBrokerInterceptor.class.getName());
            properties.put("confluent.cluster.link.metadata.topic.enable", "true");
            properties.put("confluent.cluster.link.metadata.topic.replication.factor", "1");
            properties.put("confluent.cluster.link.metadata.topic.partitions", "3");
            if (testRunsWithLinkCoordinator()) {
                properties.put("confluent.cluster.link.metadata.topic.enable", "true");
                properties.put("confluent.cluster.link.metadata.topic.partitions", "1");
                properties.put("confluent.cluster.link.metadata.topic.replication.factor", "1");
                properties.put("confluent.cluster.link.metadata.topic.min.isr", "1");
            }
            return properties;
        }

        public void setPhysicalClusterMetadataReadyState(boolean z) {
            this.physicalClusterMetadata.setIsLkcMetadataReady(z);
        }

        public Config getBrokerDefaultConfig() {
            try {
                ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "");
                return (Config) ((KafkaFuture) this.internalListenerAdmin.describeConfigs(Collections.singleton(configResource)).values().get(configResource)).get(15L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void setBrokerDefaultConfig(Map<String, String> map) {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "");
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<String, String> entry : map.entrySet()) {
                arrayList.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
            }
            try {
                this.internalListenerAdmin.incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList)).all().get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private boolean testRunsWithLinkCoordinator() {
            return this.testInfo.getDisplayName().contains("coordinator=true");
        }

        LogicalClusterUser createLinkUser(int i) throws Exception {
            LogicalClusterUser addUser = this.logicalCluster.addUser(this.physicalCluster.getOrCreateUser(i, false));
            Set<AclBinding> linkAcls = linkAcls(addUser);
            addLinkAcls(linkAcls);
            waitForAclsToExistOnAllBrokers(this.physicalCluster, linkAcls, addUser.tenantPrefix());
            return addUser;
        }

        private static void waitForAclsToExistOnAllBrokers(PhysicalCluster physicalCluster, Set<AclBinding> set, String str) {
            TestUtils.waitUntilTrue(() -> {
                return Boolean.valueOf(physicalCluster.kafkaCluster().kafkaBrokers().stream().allMatch(kafkaBroker -> {
                    if (kafkaBroker.authorizer().isEmpty()) {
                        return false;
                    }
                    return removeTenantPrefix(Lists.newArrayList(((Authorizer) kafkaBroker.authorizer().get()).acls(AclBindingFilter.ANY)), str).containsAll(set);
                }));
            }, () -> {
                return "Failed to validate all ACLs exist on all brokers";
            }, 15000L, 100L);
        }

        private static Collection<AclBinding> removeTenantPrefix(Collection<AclBinding> collection, String str) {
            return (Collection) collection.stream().map(aclBinding -> {
                AccessControlEntry entry = aclBinding.entry();
                if (!entry.principal().contains(str)) {
                    return new AclBinding(aclBinding.pattern(), aclBinding.entry());
                }
                String replaceFirst = entry.principal().replaceFirst(str, "").replaceFirst("TenantUser", "User");
                ResourcePattern pattern = aclBinding.pattern();
                return new AclBinding((pattern.name().equals(str) && pattern.patternType().equals(PatternType.PREFIXED)) ? new ResourcePattern(pattern.resourceType(), "*", PatternType.LITERAL) : new ResourcePattern(pattern.resourceType(), pattern.name().replaceFirst(str, ""), pattern.patternType()), new AccessControlEntry(replaceFirst, entry.host(), entry.operation(), entry.permissionType(), entry.clusterLinkIds()));
            }).collect(Collectors.toList());
        }

        private Map<String, String> clientConfigsForExternalListener() {
            Properties securityProps = KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(), SecurityProtocol.SASL_SSL, "PLAIN", this.linkUser.saslJaasConfig());
            HashMap hashMap = new HashMap();
            securityProps.stringPropertyNames().forEach(str -> {
                hashMap.put(str, securityProps.getProperty(str));
            });
            hashMap.putAll(this.sslConfigs);
            return hashMap;
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i) throws Throwable {
            return createDestClusterLinkResult(confluentAdmin, str, multiTenantCluster, i, Collections.emptyMap());
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i, Map<String, String> map) throws Throwable {
            HashMap hashMap = new HashMap();
            multiTenantCluster.linkUser = multiTenantCluster.createLinkUser(i);
            hashMap.putAll(multiTenantCluster.clientConfigsForExternalListener());
            hashMap.put("request.timeout.ms", "1000");
            hashMap.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), "{ \"groupFilters\": [{ \"name\": \"*\", \"patternType\": \"literal\", \"filterType\": \"include\" }]}");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "2000");
            hashMap.putAll(map);
            NewClusterLink newClusterLink = new NewClusterLink(str, multiTenantCluster.logicalCluster.logicalClusterId(), hashMap, (Uuid) null);
            return confluentAdmin.createClusterLinks(Collections.singleton(newClusterLink), new CreateClusterLinksOptions().validateOnly(false).validateLink(true));
        }

        KafkaProducer<String, String> getOrCreateProducerToInternalListener() {
            if (this.producerToInternalListener == null) {
                Properties properties = new Properties();
                properties.putAll(this.sslConfigs);
                this.producerToInternalListener = createProducer(SecurityProtocol.PLAINTEXT, "", "", Optional.of("INTERNAL"), properties);
            }
            return this.producerToInternalListener;
        }

        KafkaConsumer<String, String> getOrCreateConsumerToExternalListener(String str) {
            if (this.consumerToExternalListener == null) {
                Properties properties = new Properties();
                properties.putAll(this.sslConfigs);
                this.consumerToExternalListener = createConsumer(this.user, str, SecurityProtocol.SASL_SSL, "PLAIN", Optional.empty(), properties);
            }
            return this.consumerToExternalListener;
        }

        private void addLinkAcls(Set<AclBinding> set) throws Exception {
            this.admin.createAcls(set).all().get(15L, TimeUnit.SECONDS);
        }

        private Set<AclBinding> linkAcls(LogicalClusterUser logicalClusterUser) {
            String kafkaPrincipal = logicalClusterUser.unprefixedKafkaPrincipal().toString();
            return Set.of(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW)));
        }

        ClusterLinkFactory.LinkManager linkManager() {
            return this.physicalCluster.kafkaCluster().kafkaBrokers().get(0).clusterLinkManager();
        }

        boolean linkIdExists(String str) {
            return linkManager().listClusterLinks().find(clusterLinkData -> {
                return Boolean.valueOf(clusterLinkData.linkName().equals(this.user.tenantPrefix() + str));
            }).isDefined();
        }

        ClusterLinkListing listClusterLinks(String str) {
            try {
                return (ClusterLinkListing) ((Collection) this.admin.listClusterLinks(new ListClusterLinksOptions().linkNames(Optional.of(Collections.singletonList(str)))).result().get(15L, TimeUnit.SECONDS)).stream().findFirst().orElse(null);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        ClusterLinkDescription linkDescription(String str) {
            try {
                return (ClusterLinkDescription) ((Collection) this.admin.describeClusterLinks(new DescribeClusterLinksOptions().linkNames(Collections.singleton(str))).result().get(15L, TimeUnit.SECONDS)).stream().findFirst().orElse(null);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void verifyDefaultDataPolicyCrossOrgDeniedMetrics(boolean z) {
            Double valueOf = Double.valueOf(0.0d);
            for (KafkaBroker kafkaBroker : this.physicalCluster.kafkaCluster().kafkaBrokers()) {
                Double d = (Double) getMetricValue(kafkaBroker, DefaultDataPolicyStore.crossOrgDeniedTotalMetricName(kafkaBroker.config().brokerSessionUuid()));
                Assertions.assertNotNull(d);
                valueOf = Double.valueOf(Math.max(valueOf.doubleValue(), d.doubleValue()));
            }
            if (z) {
                Assertions.assertTrue(valueOf.doubleValue() > 0.0d);
            } else {
                Assertions.assertEquals(0.0d, valueOf);
            }
        }

        public void verifyDefaultDataPolicyEnforcementMetric(Integer num) {
            for (KafkaBroker kafkaBroker : this.physicalCluster.kafkaCluster().kafkaBrokers()) {
                Integer num2 = (Integer) getMetricValue(kafkaBroker, DefaultDataPolicyStore.defaultDataPolicyEnforcementMetricName(kafkaBroker.config().brokerSessionUuid()));
                Assertions.assertNotNull(num2);
                Assertions.assertEquals(num, num2);
            }
        }

        private Object getMetricValue(KafkaBroker kafkaBroker, MetricName metricName) {
            for (Map.Entry entry : kafkaBroker.metrics().metrics().entrySet()) {
                if (((MetricName) entry.getKey()).equals(metricName)) {
                    return ((KafkaMetric) entry.getValue()).metricValue();
                }
            }
            return null;
        }

        Map<TopicPartition, OffsetAndMetadata> committedOffsets(String str) {
            try {
                return (Map) this.admin.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static Stream kraftCombinations() {
        return Stream.of(Arguments.of(new Object[]{"kraft", "true"}));
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        String method = ((Method) testInfo.getTestMethod().get()).toString();
        createSslStores(method.contains("testConfluentTrustManagerWithBadCert") ? "randomhost" : MultiTenantRequestContextTest.LOCALHOST, "requested", !method.contains("testConfluentTrustManagerWithBadCert"));
        this.sourceCluster = new MultiTenantCluster(testInfo, this.sslConfigs, this.clientSslConfigs);
        this.destCluster = new MultiTenantCluster(testInfo, this.sslConfigs, this.clientSslConfigs);
    }

    @AfterEach
    public void tearDown() throws Exception {
        try {
            this.sourceCluster.shutdown();
            List list = (List) this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers().stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            this.destCluster.shutdown();
            list.forEach(kafkaBroker -> {
                kafkaBroker.clusterLinkManager().ensureEmptyIfNoLinks();
            });
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
            TestRegionalMetadataClient.clearBootstrapServers();
        } catch (Throwable th) {
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
            TestRegionalMetadataClient.clearBootstrapServers();
            throw th;
        }
    }

    private void createSslStores(String str, String str2, boolean z) throws Exception {
        CertStores build = new CertStores.Builder(true).cn(SSL_KAFKA_CN).addHostName(str).build();
        Properties properties = new Properties();
        BiConsumer biConsumer = (str3, obj) -> {
            if (obj instanceof Password) {
                properties.setProperty(str3, ((Password) obj).value());
            } else if (obj instanceof List) {
                properties.setProperty(str3, String.join(",", (List) obj));
            } else if (obj != null) {
                properties.setProperty(str3, (String) obj);
            }
        };
        build.keyStoreProps().forEach(biConsumer);
        build.trustStoreProps().forEach(biConsumer);
        TestSslUtils.convertToPemWithoutFiles(properties);
        properties.forEach((obj2, obj3) -> {
            this.sslConfigs.put((String) obj2, (String) obj3);
        });
        this.clientSslConfigs.putAll(this.sslConfigs);
        if (z) {
            this.clientSslConfigs.put("ssl.endpoint.identification.algorithm", "https");
        } else {
            this.clientSslConfigs.put("ssl.endpoint.identification.algorithm", "");
        }
        this.sslConfigs.put("listener.name.external.ssl.client.auth", str2);
        this.sslConfigs.put("listener.name.external.ssl.trustmanager.algorithm", "ConfluentTls");
        this.sslConfigs.put("listener.name.external.security.providers", "io.confluent.kafka.server.plugins.ssl.ConfluentTrustProviderCreator");
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMetadataNotAvailableWithBrokerRestart(String str, boolean z) throws Throwable {
        Map<String, String> destBrokerProps = destBrokerProps();
        destBrokerProps.put("confluent.cluster.link.intranet.connectivity.denied.org.ids", "foo-org");
        setUpClusters(srcBrokerProps(), destBrokerProps);
        String bootstrapServers = this.sourceCluster.physicalCluster.kafkaCluster().bootstrapServers("EXTERNAL");
        TestRegionalMetadataClient.seedBootstrapServers(TEST_ORGANIZATION_ID, this.sourceCluster.logicalCluster.logicalClusterId(), bootstrapServers);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.singletonMap("bootstrap.servers", "localhost:0")).all().get();
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        testBasicMirroring();
        this.destCluster.restartBrokers();
        this.destCluster.createAdmins();
        waitFor(() -> {
            ClusterLinkListing listClusterLinks = this.destCluster.listClusterLinks("tenantLink");
            return listClusterLinks != null ? listClusterLinks.linkState() : ClusterLinkDescription.LinkState.UNKNOWN;
        }, ClusterLinkDescription.LinkState.FAILED, "Cluster link doesn't transition to failed");
        this.destCluster.seedLogicalClusterMetadata();
        ClusterLinkManager linkManager = this.destCluster.linkManager();
        waitFor(() -> {
            List destClientManagerRemoteAdminNodes = linkManager.destClientManagerRemoteAdminNodes(this.destCluster.user.tenantPrefix() + "tenantLink");
            return !destClientManagerRemoteAdminNodes.isEmpty() ? ((Node) destClientManagerRemoteAdminNodes.get(0)).host() + ":" + ((Node) destClientManagerRemoteAdminNodes.get(0)).port() : "";
        }, bootstrapServers, "Cluster link doesn't transition to active/unavailable or nodes do not match");
        ClusterLinkListing listClusterLinks = this.destCluster.listClusterLinks("tenantLink");
        Assertions.assertTrue(listClusterLinks != null);
        Assertions.assertEquals(ClusterLinkDescription.LinkState.ACTIVE, listClusterLinks.linkState());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDefaultDataPolicy(String str, boolean z) throws Throwable {
        setUpClusters(srcBrokerProps(), destBrokerProps());
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        testBasicMirroring();
        verifyOffsetMigration();
        ensureNetworkTypeIsPersisted();
        this.destCluster.setBrokerDefaultConfig(Collections.singletonMap("confluent.cluster.link.intranet.connectivity.enable", Boolean.FALSE.toString()));
        this.destCluster.restartBrokers();
        this.destCluster.createAdmins();
        ensureNetworkTypeIsPersisted();
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testConfluentTrustManagerWithBadCert(String str, boolean z) throws Throwable {
        this.destCluster.startCluster(destBrokerProps(), "lkc-dest", 11, this.destClusterOrganizationId);
        Assertions.assertEquals(SslAuthenticationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            addAcls(this.destCluster.admin, this.destCluster.user, new String[0]);
        })).getCause().getClass());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testConfluentTrustManagerWithGoodCert(String str, boolean z) throws Throwable {
        this.destCluster.startCluster(destBrokerProps(), "lkc-dest", 11, this.destClusterOrganizationId);
        addAcls(this.destCluster.admin, this.destCluster.user, new String[0]);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDefaultDataPolicyWithDifferentOrg(String str, boolean z) throws Throwable {
        this.destClusterOrganizationId = "test-diff-org";
        setUpClusters(srcBrokerProps(), destBrokerProps());
        this.sourceCluster.verifyDefaultDataPolicyCrossOrgDeniedMetrics(false);
        Assertions.assertEquals(SaslAuthenticationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        })).getCause().getClass());
        this.sourceCluster.verifyDefaultDataPolicyEnforcementMetric(1);
        this.sourceCluster.verifyDefaultDataPolicyCrossOrgDeniedMetrics(true);
        this.sourceCluster.setBrokerDefaultConfig(Collections.singletonMap("confluent.default.data.policy.enforcement", Boolean.FALSE.toString()));
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1002).all().get();
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        testBasicMirroring();
        this.sourceCluster.verifyDefaultDataPolicyEnforcementMetric(0);
        this.sourceCluster.verifyDefaultDataPolicyCrossOrgDeniedMetrics(true);
        this.sourceCluster.setBrokerDefaultConfig(Collections.singletonMap("confluent.default.data.policy.enforcement", Boolean.TRUE.toString()));
        Assertions.assertEquals(SaslAuthenticationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink2", this.sourceCluster, 1003).all().get();
        })).getCause().getClass());
        this.sourceCluster.verifyDefaultDataPolicyEnforcementMetric(1);
        this.sourceCluster.verifyDefaultDataPolicyCrossOrgDeniedMetrics(true);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDefaultDataPolicyWithDenyOrg(String str, boolean z) throws Throwable {
        Map<String, String> srcBrokerProps = srcBrokerProps();
        srcBrokerProps.put("confluent.cluster.link.intranet.connectivity.denied.org.ids", "org-test,foo-org");
        setUpClusters(srcBrokerProps, destBrokerProps());
        Assertions.assertEquals(SaslAuthenticationException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        })).getCause().getClass());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testClusterMetadataNotReadyWithFlatNetworkingDisabled(String str, boolean z) throws Throwable {
        Map<String, String> srcBrokerProps = srcBrokerProps();
        Map<String, String> destBrokerProps = destBrokerProps();
        srcBrokerProps.put("confluent.cluster.link.intranet.connectivity.enable", Boolean.FALSE.toString());
        destBrokerProps.put("confluent.cluster.link.intranet.connectivity.enable", Boolean.FALSE.toString());
        srcBrokerProps.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.multitenant.integration.cluster.TestPlainLoginModule required default_data_policy_validation_mode=\"none\";");
        setUpClusters(srcBrokerProps, destBrokerProps);
        this.destCluster.setPhysicalClusterMetadataReadyState(false);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testClusterMetadataNotReadyWithFlatNetworkingEnabled(String str, boolean z) throws Throwable {
        Map<String, String> srcBrokerProps = srcBrokerProps();
        srcBrokerProps.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.multitenant.integration.cluster.TestPlainLoginModule required default_data_policy_validation_mode=\"none\";");
        setUpClusters(srcBrokerProps, destBrokerProps());
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        this.destCluster.setPhysicalClusterMetadataReadyState(false);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "lkc-dest_tenantLink");
        this.destCluster.internalListenerAdmin.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "true"), AlterConfigOp.OpType.SET)))).all().get();
        this.destCluster.internalListenerAdmin.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "false"), AlterConfigOp.OpType.SET)))).all().get();
        testBasicMirroring();
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testClusterMetadataTransition(String str, boolean z) throws Throwable {
        setUpClusters(srcBrokerProps(), destBrokerProps());
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        waitFor(() -> {
            return Boolean.valueOf(this.destCluster.linkIdExists("tenantLink"));
        }, true, "Link was not created");
        this.destCluster.setPhysicalClusterMetadataReadyState(false);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "lkc-dest_tenantLink");
        this.destCluster.internalListenerAdmin.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "true"), AlterConfigOp.OpType.SET)))).all().get();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "false"), AlterConfigOp.OpType.SET));
        arrayList.add(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.AvailabilityCheckMsProp(), "500"), AlterConfigOp.OpType.SET));
        arrayList.add(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.AvailabilityCheckConsecutiveFailureThresholdProp(), "3"), AlterConfigOp.OpType.SET));
        this.destCluster.internalListenerAdmin.incrementalAlterConfigs(Collections.singletonMap(configResource, arrayList)).all().get();
        waitFor(() -> {
            return this.destCluster.linkDescription("tenantLink").linkState();
        }, ClusterLinkDescription.LinkState.UNAVAILABLE, "Link is not unavailable after cluster metadata is not ready");
        Assertions.assertEquals(ClusterLinkError.AUTHENTICATION_ERROR, this.destCluster.linkDescription("tenantLink").clusterLinkError());
        this.destCluster.setPhysicalClusterMetadataReadyState(true);
        waitFor(() -> {
            return this.destCluster.linkDescription("tenantLink").linkState();
        }, ClusterLinkDescription.LinkState.UNAVAILABLE, "Link is not available after cluster metadata is ready");
    }

    private Map<String, String> destBrokerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.traffic.network.id", "n-test-dest");
        hashMap.put("confluent.regional.metadata.client.class", TestRegionalMetadataClient.class.getName());
        hashMap.put("multitenant.metadata.class", TestPhysicalClusterMetadata.class.getName());
        hashMap.put("confluent.cluster.link.intranet.connectivity.enable", Boolean.TRUE.toString());
        hashMap.put(KafkaConfig.SaslEnabledMechanismsProp(), "PLAIN");
        hashMap.put("confluent.ccloud.host.suffixes", MultiTenantRequestContextTest.LOCALHOST);
        hashMap.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.multitenant.integration.cluster.TestPlainLoginModule required;");
        return hashMap;
    }

    private Map<String, String> srcBrokerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.traffic.network.id", "n-test-src");
        hashMap.put("multitenant.metadata.class", TestPhysicalClusterMetadata.class.getName());
        hashMap.put(KafkaConfig.SaslEnabledMechanismsProp(), "PLAIN");
        hashMap.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.multitenant.integration.cluster.TestPlainLoginModule required default_data_policy_validation_mode=\"strict\";");
        hashMap.put("confluent.ccloud.host.suffixes", MultiTenantRequestContextTest.LOCALHOST);
        return hashMap;
    }

    private void setUpClusters(Map<String, String> map, Map<String, String> map2) throws Exception {
        this.sourceCluster.startCluster(map, "lkc-source", 1, TEST_ORGANIZATION_ID);
        addAcls(this.sourceCluster.internalListenerAdmin, this.sourceCluster.user, new String[0]);
        this.destCluster.startCluster(map2, "lkc-dest", 11, this.destClusterOrganizationId);
        addAcls(this.destCluster.admin, this.destCluster.user, new String[0]);
    }

    private void testBasicMirroring() throws Throwable {
        this.sourceCluster.admin.createTopics(Collections.singleton(new NewTopic("linkedTopic", Optional.of(Integer.valueOf(this.numPartitions)), Optional.of((short) 1))));
        waitFor(() -> {
            try {
                return Boolean.valueOf(((Map) this.sourceCluster.admin.listTopics().namesToListings().get()).containsKey("linkedTopic"));
            } catch (Exception e) {
                return false;
            }
        }, true, "Failed to list topic");
        createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducerToInternalListener(), "lkc-source_linkedTopic", this.nextMessageIndex, 10);
        this.nextMessageIndex += 10;
        KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumerToExternalListener("destGroup"), "linkedTopic", 0, this.nextMessageIndex);
    }

    private void createMirrorTopicWaitForSuccess(ConfluentAdmin confluentAdmin, String str) throws Exception {
        NewTopic mirror = new NewTopic(str + "linkedTopic", Optional.empty(), Optional.of((short) 1)).mirror(Optional.of(new NewMirrorTopic("tenantLink", "linkedTopic")));
        CreateTopicsOptions timeoutMs = new CreateTopicsOptions().timeoutMs(5000);
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            try {
                confluentAdmin.createTopics(Collections.singleton(mirror), timeoutMs).all().get();
                return true;
            } catch (Throwable th) {
                log.error("Failed to create mirror topic {}", mirror, th);
                return false;
            }
        }, "Failed to create mirror");
    }

    private void ensureNetworkTypeIsPersisted() {
        ConfigEntry configEntry = this.destCluster.getBrokerDefaultConfig().get("confluent.traffic.network.type");
        Assertions.assertTrue(configEntry != null);
        Assertions.assertEquals(SaslInternalConfigs.NetworkType.PRIVATE.name(), configEntry.value());
    }

    private static Set<AclBinding> addAcls(Admin admin, LogicalClusterUser logicalClusterUser, String... strArr) throws Exception {
        String kafkaPrincipal = logicalClusterUser.unprefixedKafkaPrincipal().toString();
        Set<AclBinding> of = Set.of(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALL, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TOPIC, "src_linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALL, AclPermissionType.ALLOW)));
        for (String str : strArr) {
            of.add(new AclBinding(new ResourcePattern(ResourceType.TOPIC, str, PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALL, AclPermissionType.ALLOW)));
        }
        admin.createAcls(of).all().get(15L, TimeUnit.SECONDS);
        return of;
    }

    private static <T> void waitFor(Supplier<T> supplier, T t, String str) throws Exception {
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return t.equals(supplier.get());
        }, () -> {
            return str + " : expected=" + String.valueOf(t) + ", actual=" + String.valueOf(supplier.get());
        });
    }

    private void verifyOffsetMigration() throws Throwable {
        addBrokerAclsForOffsetMigration();
        String str = "linkedGroup";
        waitFor(() -> {
            return this.destCluster.committedOffsets(str);
        }, commitOffsets(this.sourceCluster.admin, "linkedGroup"), "Consumer offsets not migrated");
    }

    private Map<TopicPartition, OffsetAndMetadata> commitOffsets(Admin admin, String str) throws Exception {
        HashMap hashMap = new HashMap();
        LogManager logManager = this.sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().get(0).logManager();
        for (int i = 0; i < this.numPartitions; i++) {
            hashMap.put(new TopicPartition("linkedTopic", i), new OffsetAndMetadata(((AbstractLog) logManager.getLog(new TopicPartition(this.sourceCluster.user.tenantPrefix() + "linkedTopic", i), false).get()).localLogEndOffset()));
        }
        admin.alterConsumerGroupOffsets(str, hashMap).all().get();
        return hashMap;
    }

    private void addBrokerAclsForOffsetMigration() {
        String str = this.destCluster.user.tenantPrefix() + "linked";
        this.destCluster.physicalCluster.newAclCommand().consumeAclArgs(PhysicalCluster.BROKER_PRINCIPAL, str, str, PatternType.PREFIXED).execute();
    }
}
