/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.link.integration;

import com.google.common.collect.Lists;
import io.confluent.kafka.link.integration.TestRegionalMetadataClient;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.UserMetadata;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.integration.test.SaslAuthenticateRequestCallback;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.auth.DefaultDataPolicyContext;
import io.confluent.kafka.server.plugins.auth.TestLogicalClusterMetadata;
import io.confluent.kafka.server.plugins.auth.TestPhysicalClusterMetadata;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.CreateClusterLinkPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import io.confluent.kafka.storage.log.AbstractLog;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.log.LogManager;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkManager;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateClusterLinksOptions;
import org.apache.kafka.clients.admin.CreateClusterLinksResult;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DescribeClusterLinksOptions;
import org.apache.kafka.clients.admin.ListClusterLinksOptions;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.clients.admin.NewMirrorTopic;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.SaslInternalConfigs;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.datapolicy.DefaultDataPolicyStore;
import org.apache.kafka.server.interceptor.ConfluentCloudBrokerInterceptor;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tags(value={@Tag(value="integration"), @Tag(value="bazel:shard_count:6"), @Tag(value="bazel:size:large")})
public class MultiTenantCLDefaultDataPolicyTest {
    public static final String SSL_KAFKA_CN = "kafka";
    private static final String TEST_ORGANIZATION_ID = "org-test";
    private static final String TEST_ENVIRONMENT_ID = "env-test";
    private static final Logger log = LoggerFactory.getLogger(MultiTenantCLDefaultDataPolicyTest.class);
    private MultiTenantCluster sourceCluster;
    private MultiTenantCluster destCluster;
    private final String sourceLogicalCluster = "lkc-source";
    private final String destLogicalCluster = "lkc-dest";
    private String destClusterOrganizationId = "org-test";
    private final String linkName = "tenantLink";
    private final String topic = "linkedTopic";
    private int numPartitions = 2;
    private int nextMessageIndex = 0;
    private final Map<String, String> sslConfigs = new HashMap<String, String>();
    private final Map<String, String> clientSslConfigs = new HashMap<String, String>();

    public static Stream<?> kraftCombinations() {
        return Stream.of(Arguments.of((Object[])new Object[]{"kraft", "true"}));
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        String testMethod = ((Method)testInfo.getTestMethod().get()).toString();
        String hostName = testMethod.contains("testConfluentTrustManagerWithBadCert") ? "randomhost" : "localhost";
        boolean clientVerifyServerHostName = !testMethod.contains("testConfluentTrustManagerWithBadCert");
        this.createSslStores(hostName, "requested", clientVerifyServerHostName);
        this.sourceCluster = new MultiTenantCluster(testInfo, this.sslConfigs, this.clientSslConfigs);
        this.destCluster = new MultiTenantCluster(testInfo, this.sslConfigs, this.clientSslConfigs);
    }

    @AfterEach
    public void tearDown() throws Exception {
        try {
            this.sourceCluster.shutdown();
            List<KafkaBroker> destBrokers = this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers().stream().filter(Objects::nonNull).collect(Collectors.toList());
            this.destCluster.shutdown();
            destBrokers.forEach(broker -> ((ClusterLinkManager)broker.clusterLinkManager()).ensureEmptyIfNoLinks());
        }
        finally {
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
            TestRegionalMetadataClient.clearBootstrapServers();
        }
    }

    private void createSslStores(String certHostName, String sslClientAuth, boolean hostNameVerification) throws Exception {
        CertStores certStores = new CertStores.Builder(true).cn(SSL_KAFKA_CN).addHostName(certHostName).build();
        Properties props = new Properties();
        BiConsumer<String, Object> copyEntry = (k, v) -> {
            if (v instanceof Password) {
                props.setProperty((String)k, ((Password)v).value());
            } else if (v instanceof List) {
                List listOfString = (List)v;
                props.setProperty((String)k, String.join((CharSequence)",", listOfString));
            } else if (v != null) {
                props.setProperty((String)k, (String)v);
            }
        };
        certStores.keyStoreProps().forEach(copyEntry);
        certStores.trustStoreProps().forEach(copyEntry);
        TestSslUtils.convertToPemWithoutFiles((Properties)props);
        props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> this.sslConfigs.put((String)k, (String)v)));
        this.clientSslConfigs.putAll(this.sslConfigs);
        if (hostNameVerification) {
            this.clientSslConfigs.put("ssl.endpoint.identification.algorithm", "https");
        } else {
            this.clientSslConfigs.put("ssl.endpoint.identification.algorithm", "");
        }
        this.sslConfigs.put("listener.name.external.ssl.client.auth", sslClientAuth);
        this.sslConfigs.put("listener.name.external.ssl.trustmanager.algorithm", "ConfluentTls");
        this.sslConfigs.put("listener.name.external.security.providers", "io.confluent.kafka.server.plugins.ssl.ConfluentTrustProviderCreator");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"kraftCombinations"})
    public void testMetadataNotAvailableWithBrokerRestart(String quorum, boolean coordinator) throws Throwable {
        Map<String, String> destBrokerProps = this.destBrokerProps();
        destBrokerProps.put("confluent.cluster.link.intranet.connectivity.denied.org.ids", "foo-org");
        this.setUpClusters(this.srcBrokerProps(), destBrokerProps);
        String sourceClusterBootstrapServer = this.sourceCluster.physicalCluster.kafkaCluster().bootstrapServers("EXTERNAL");
        TestRegionalMetadataClient.seedBootstrapServers(TEST_ORGANIZATION_ID, this.sourceCluster.logicalCluster.logicalClusterId(), sourceClusterBootstrapServer);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.singletonMap("bootstrap.servers", "localhost:0")).all().get();
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.testBasicMirroring();
        this.destCluster.restartBrokers();
        this.destCluster.createAdmins();
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> {
            ClusterLinkListing clusterLinkList = this.destCluster.listClusterLinks("tenantLink");
            if (clusterLinkList != null) {
                return clusterLinkList.linkState();
            }
            return ClusterLinkDescription.LinkState.UNKNOWN;
        }, ClusterLinkDescription.LinkState.FAILED, "Cluster link doesn't transition to failed");
        this.destCluster.seedLogicalClusterMetadata();
        ClusterLinkManager linkManager = (ClusterLinkManager)this.destCluster.linkManager();
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> {
            List nodes = linkManager.destClientManagerRemoteAdminNodes(this.destCluster.user.tenantPrefix() + "tenantLink");
            if (!nodes.isEmpty()) {
                String nodeHostPort = ((Node)nodes.get(0)).host() + ":" + ((Node)nodes.get(0)).port();
                return nodeHostPort;
            }
            return "";
        }, sourceClusterBootstrapServer, "Cluster link doesn't transition to active/unavailable or nodes do not match");
        ClusterLinkListing clusterLinkList = this.destCluster.listClusterLinks("tenantLink");
        Assertions.assertTrue((clusterLinkList != null ? 1 : 0) != 0);
        Assertions.assertEquals((Object)ClusterLinkDescription.LinkState.ACTIVE, (Object)clusterLinkList.linkState());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"kraftCombinations"})
    public void testDefaultDataPolicy(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(this.srcBrokerProps(), this.destBrokerProps());
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.testBasicMirroring();
        this.verifyOffsetMigration();
        this.ensureNetworkTypeIsPersisted();
        this.destCluster.setBrokerDefaultConfig(Collections.singletonMap("confluent.cluster.link.intranet.connectivity.enable", Boolean.FALSE.toString()));
        this.destCluster.restartBrokers();
        this.destCluster.createAdmins();
        this.ensureNetworkTypeIsPersisted();
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"kraftCombinations"})
    public void testConfluentTrustManagerWithBadCert(String quorum, boolean coordinator) throws Throwable {
        this.destCluster.startCluster(this.destBrokerProps(), "lkc-dest", 11, this.destClusterOrganizationId);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> MultiTenantCLDefaultDataPolicyTest.addAcls((Admin)this.destCluster.admin, this.destCluster.user, new String[0]));
        Assertions.assertEquals(SslAuthenticationException.class, e.getCause().getClass());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"kraftCombinations"})
    public void testConfluentTrustManagerWithGoodCert(String quorum, boolean coordinator) throws Throwable {
        this.destCluster.startCluster(this.destBrokerProps(), "lkc-dest", 11, this.destClusterOrganizationId);
        MultiTenantCLDefaultDataPolicyTest.addAcls((Admin)this.destCluster.admin, this.destCluster.user, new String[0]);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"kraftCombinations"})
    public void testDefaultDataPolicyWithDifferentOrg(String quorum, boolean coordinator) throws Throwable {
        this.destClusterOrganizationId = "test-diff-org";
        this.setUpClusters(this.srcBrokerProps(), this.destBrokerProps());
        this.sourceCluster.verifyDefaultDataPolicyCrossOrgDeniedMetrics(false);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get());
        Assertions.assertEquals(SaslAuthenticationException.class, e.getCause().getClass());
        this.sourceCluster.verifyDefaultDataPolicyEnforcementMetric(1);
        this.sourceCluster.verifyDefaultDataPolicyCrossOrgDeniedMetrics(true);
        this.sourceCluster.setBrokerDefaultConfig(Collections.singletonMap("confluent.default.data.policy.enforcement", Boolean.FALSE.toString()));
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1002).all().get();
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.testBasicMirroring();
        this.sourceCluster.verifyDefaultDataPolicyEnforcementMetric(0);
        this.sourceCluster.verifyDefaultDataPolicyCrossOrgDeniedMetrics(true);
        this.sourceCluster.setBrokerDefaultConfig(Collections.singletonMap("confluent.default.data.policy.enforcement", Boolean.TRUE.toString()));
        e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink2", this.sourceCluster, 1003).all().get());
        Assertions.assertEquals(SaslAuthenticationException.class, e.getCause().getClass());
        this.sourceCluster.verifyDefaultDataPolicyEnforcementMetric(1);
        this.sourceCluster.verifyDefaultDataPolicyCrossOrgDeniedMetrics(true);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"kraftCombinations"})
    public void testDefaultDataPolicyWithDenyOrg(String quorum, boolean coordinator) throws Throwable {
        Map<String, String> srcBrokerProps = this.srcBrokerProps();
        srcBrokerProps.put("confluent.cluster.link.intranet.connectivity.denied.org.ids", "org-test,foo-org");
        this.setUpClusters(srcBrokerProps, this.destBrokerProps());
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get());
        Assertions.assertEquals(SaslAuthenticationException.class, e.getCause().getClass());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"kraftCombinations"})
    public void testClusterMetadataNotReadyWithFlatNetworkingDisabled(String quorum, boolean coordinator) throws Throwable {
        Map<String, String> srcBrokerProps = this.srcBrokerProps();
        Map<String, String> destBrokerProps = this.destBrokerProps();
        srcBrokerProps.put("confluent.cluster.link.intranet.connectivity.enable", Boolean.FALSE.toString());
        destBrokerProps.put("confluent.cluster.link.intranet.connectivity.enable", Boolean.FALSE.toString());
        srcBrokerProps.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.multitenant.integration.cluster.TestPlainLoginModule required default_data_policy_validation_mode=\"none\";");
        this.setUpClusters(srcBrokerProps, destBrokerProps);
        this.destCluster.setPhysicalClusterMetadataReadyState(false);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"kraftCombinations"})
    public void testClusterMetadataNotReadyWithFlatNetworkingEnabled(String quorum, boolean coordinator) throws Throwable {
        Map<String, String> srcBrokerProps = this.srcBrokerProps();
        srcBrokerProps.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.multitenant.integration.cluster.TestPlainLoginModule required default_data_policy_validation_mode=\"none\";");
        this.setUpClusters(srcBrokerProps, this.destBrokerProps());
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.destCluster.setPhysicalClusterMetadataReadyState(false);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "lkc-dest_tenantLink");
        Map<ConfigResource, List<AlterConfigOp>> alterOps = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "true"), AlterConfigOp.OpType.SET)));
        this.destCluster.internalListenerAdmin.incrementalAlterConfigs(alterOps).all().get();
        alterOps = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "false"), AlterConfigOp.OpType.SET)));
        this.destCluster.internalListenerAdmin.incrementalAlterConfigs(alterOps).all().get();
        this.testBasicMirroring();
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"kraftCombinations"})
    public void testClusterMetadataTransition(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(this.srcBrokerProps(), this.destBrokerProps());
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001).all().get();
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.destCluster.setPhysicalClusterMetadataReadyState(false);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "lkc-dest_tenantLink");
        Map<ConfigResource, List<AlterConfigOp>> alterOps = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "true"), AlterConfigOp.OpType.SET)));
        this.destCluster.internalListenerAdmin.incrementalAlterConfigs(alterOps).all().get();
        ArrayList<AlterConfigOp> configOps = new ArrayList<AlterConfigOp>();
        configOps.add(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "false"), AlterConfigOp.OpType.SET));
        configOps.add(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.AvailabilityCheckMsProp(), "500"), AlterConfigOp.OpType.SET));
        configOps.add(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.AvailabilityCheckConsecutiveFailureThresholdProp(), "3"), AlterConfigOp.OpType.SET));
        alterOps = Collections.singletonMap(configResource, configOps);
        this.destCluster.internalListenerAdmin.incrementalAlterConfigs(alterOps).all().get();
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> this.destCluster.linkDescription("tenantLink").linkState(), ClusterLinkDescription.LinkState.UNAVAILABLE, "Link is not unavailable after cluster metadata is not ready");
        ClusterLinkDescription linkDescription = this.destCluster.linkDescription("tenantLink");
        Assertions.assertEquals((Object)ClusterLinkError.AUTHENTICATION_ERROR, (Object)linkDescription.clusterLinkError());
        this.destCluster.setPhysicalClusterMetadataReadyState(true);
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> this.destCluster.linkDescription("tenantLink").linkState(), ClusterLinkDescription.LinkState.UNAVAILABLE, "Link is not available after cluster metadata is ready");
    }

    private Map<String, String> destBrokerProps() {
        HashMap<String, String> destBrokerOverrides = new HashMap<String, String>();
        destBrokerOverrides.put("confluent.traffic.network.id", "n-test-dest");
        destBrokerOverrides.put("confluent.regional.metadata.client.class", TestRegionalMetadataClient.class.getName());
        destBrokerOverrides.put("multitenant.metadata.class", TestPhysicalClusterMetadata.class.getName());
        destBrokerOverrides.put("confluent.cluster.link.intranet.connectivity.enable", Boolean.TRUE.toString());
        destBrokerOverrides.put(KafkaConfig.SaslEnabledMechanismsProp(), "PLAIN");
        destBrokerOverrides.put("confluent.ccloud.host.suffixes", "localhost");
        destBrokerOverrides.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.multitenant.integration.cluster.TestPlainLoginModule required;");
        return destBrokerOverrides;
    }

    private Map<String, String> srcBrokerProps() {
        HashMap<String, String> srcBrokerOverrides = new HashMap<String, String>();
        srcBrokerOverrides.put("confluent.traffic.network.id", "n-test-src");
        srcBrokerOverrides.put("multitenant.metadata.class", TestPhysicalClusterMetadata.class.getName());
        srcBrokerOverrides.put(KafkaConfig.SaslEnabledMechanismsProp(), "PLAIN");
        srcBrokerOverrides.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.multitenant.integration.cluster.TestPlainLoginModule required default_data_policy_validation_mode=\"strict\";");
        srcBrokerOverrides.put("confluent.ccloud.host.suffixes", "localhost");
        return srcBrokerOverrides;
    }

    private void setUpClusters(Map<String, String> srcBrokerOverrides, Map<String, String> destBrokerOverrides) throws Exception {
        this.sourceCluster.startCluster(srcBrokerOverrides, "lkc-source", 1, TEST_ORGANIZATION_ID);
        MultiTenantCLDefaultDataPolicyTest.addAcls((Admin)this.sourceCluster.internalListenerAdmin, this.sourceCluster.user, new String[0]);
        this.destCluster.startCluster(destBrokerOverrides, "lkc-dest", 11, this.destClusterOrganizationId);
        MultiTenantCLDefaultDataPolicyTest.addAcls((Admin)this.destCluster.admin, this.destCluster.user, new String[0]);
    }

    private void testBasicMirroring() throws Throwable {
        NewTopic sourceTopic = new NewTopic("linkedTopic", Optional.of(this.numPartitions), Optional.of((short)1));
        this.sourceCluster.admin.createTopics(Collections.singleton(sourceTopic));
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> {
            try {
                return ((Map)this.sourceCluster.admin.listTopics().namesToListings().get()).containsKey("linkedTopic");
            }
            catch (Exception e) {
                return false;
            }
        }, true, "Failed to list topic");
        this.createMirrorTopicWaitForSuccess(this.destCluster.admin, "");
        int messageCount = 10;
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducerToInternalListener(), "lkc-source_linkedTopic", this.nextMessageIndex, messageCount);
        this.nextMessageIndex += messageCount;
        KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumerToExternalListener("destGroup"), "linkedTopic", 0, this.nextMessageIndex);
    }

    private void createMirrorTopicWaitForSuccess(ConfluentAdmin admin, String clusterLinkPrefix) throws Exception {
        NewTopic newTopic = new NewTopic(clusterLinkPrefix + "linkedTopic", Optional.empty(), Optional.of((short)1)).mirror(Optional.of(new NewMirrorTopic("tenantLink", "linkedTopic")));
        CreateTopicsOptions option = new CreateTopicsOptions().timeoutMs(Integer.valueOf(5000));
        TestUtils.waitForCondition(() -> {
            try {
                admin.createTopics(Collections.singleton(newTopic), option).all().get();
                return true;
            }
            catch (Throwable e) {
                log.error("Failed to create mirror topic {}", (Object)newTopic, (Object)e);
                return false;
            }
        }, (String)"Failed to create mirror");
    }

    private void ensureNetworkTypeIsPersisted() {
        Config destBrokerDefaultConfig = this.destCluster.getBrokerDefaultConfig();
        ConfigEntry networkTypeConfig = destBrokerDefaultConfig.get("confluent.traffic.network.type");
        Assertions.assertTrue((networkTypeConfig != null ? 1 : 0) != 0);
        Assertions.assertEquals((Object)SaslInternalConfigs.NetworkType.PRIVATE.name(), (Object)networkTypeConfig.value());
    }

    private static Set<AclBinding> addAcls(Admin adminClient, LogicalClusterUser user, String ... prefixes) throws Exception {
        String principal = user.unprefixedKafkaPrincipal().toString();
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding topicAclWithPrefix = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "src_linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        AclBinding groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
        Set<AclBinding> acls = Set.of(topicAcl, groupAcl, clusterAcl, topicAclWithPrefix);
        for (String prefix : prefixes) {
            acls.add(new AclBinding(new ResourcePattern(ResourceType.TOPIC, prefix, PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW)));
        }
        adminClient.createAcls(acls).all().get(15L, TimeUnit.SECONDS);
        return acls;
    }

    private static <T> void waitFor(Supplier<T> actual, T expected, String error) throws Exception {
        TestUtils.waitForCondition(() -> expected.equals(actual.get()), () -> error + " : expected=" + String.valueOf(expected) + ", actual=" + String.valueOf(actual.get()));
    }

    private void verifyOffsetMigration() throws Throwable {
        this.addBrokerAclsForOffsetMigration();
        String group = "linkedGroup";
        Map<TopicPartition, OffsetAndMetadata> offsets = this.commitOffsets((Admin)this.sourceCluster.admin, group);
        MultiTenantCLDefaultDataPolicyTest.waitFor(() -> this.destCluster.committedOffsets(group), offsets, "Consumer offsets not migrated");
    }

    private Map<TopicPartition, OffsetAndMetadata> commitOffsets(Admin adminClient, String group) throws Exception {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        LogManager logManager = this.sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().get(0).logManager();
        for (int i = 0; i < this.numPartitions; ++i) {
            TopicPartition tp = new TopicPartition("linkedTopic", i);
            TopicPartition prefixedPartition = new TopicPartition(this.sourceCluster.user.tenantPrefix() + "linkedTopic", i);
            offsets.put(tp, new OffsetAndMetadata(((AbstractLog)logManager.getLog(prefixedPartition, false).get()).localLogEndOffset()));
        }
        adminClient.alterConsumerGroupOffsets(group, offsets).all().get();
        return offsets;
    }

    private void addBrokerAclsForOffsetMigration() {
        String withLinkPrefix = this.destCluster.user.tenantPrefix() + "linked";
        this.destCluster.physicalCluster.newAclCommand().consumeAclArgs(PhysicalCluster.BROKER_PRINCIPAL, withLinkPrefix, withLinkPrefix, PatternType.PREFIXED).execute();
    }

    public static class MultiTenantCluster
    extends IntegrationTestHarness {
        private final Map<String, String> sslConfigs;
        private final Map<String, String> clientSslConfigs;
        private PhysicalCluster physicalCluster;
        private LogicalCluster logicalCluster;
        private LogicalClusterUser user;
        private LogicalClusterUser linkUser;
        private ConfluentAdmin admin;
        private ConfluentAdmin internalListenerAdmin;
        private TestPhysicalClusterMetadata physicalClusterMetadata;
        KafkaProducer<String, String> producerToInternalListener;
        KafkaConsumer<String, String> consumerToExternalListener;
        String organizationId;

        public MultiTenantCluster(TestInfo testInfo, Map<String, String> sslConfigs, Map<String, String> clientSslConfigs) {
            super(testInfo);
            this.sslConfigs = sslConfigs;
            this.clientSslConfigs = clientSslConfigs;
        }

        void startCluster(Map<String, String> brokerAndControllerOverrideProps, String logicalClusterId, int userId, String clusterOrganizationId) {
            this.organizationId = clusterOrganizationId;
            Properties brokerProps = this.brokerProps();
            brokerProps.putAll(this.sslConfigs);
            brokerProps.putAll(brokerAndControllerOverrideProps);
            brokerProps.setProperty("listener.security.protocol.map", "INTERNAL:PLAINTEXT,EXTERNAL:SASL_SSL");
            Properties controllerProps = this.brokerProps();
            controllerProps.putAll(brokerAndControllerOverrideProps);
            try {
                this.physicalCluster = this.startWithTopic("_confluent-logical_clusters", 1, 1, 60000L, brokerProps, controllerProps, Optional.of(Time.SYSTEM));
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
            this.logicalCluster = this.physicalCluster.createLogicalCluster(logicalClusterId, 100, userId);
            this.user = this.logicalCluster.user(userId);
            this.seedLogicalClusterMetadata();
            this.createAdmins();
        }

        private void seedLogicalClusterMetadata() {
            TestLogicalClusterMetadata testLkcMetadata = new TestLogicalClusterMetadata(this.logicalCluster.logicalClusterId(), this.organizationId, MultiTenantCLDefaultDataPolicyTest.TEST_ENVIRONMENT_ID);
            this.physicalCluster.brokerSessionUuids().forEach(brokerUuid -> {
                this.physicalClusterMetadata = (TestPhysicalClusterMetadata)TestPhysicalClusterMetadata.getInstance((String)brokerUuid);
                this.physicalClusterMetadata.addLogicalCluster(testLkcMetadata);
            });
        }

        private void createAdmins() {
            Properties clientProps = new Properties();
            clientProps.putAll(this.clientSslConfigs);
            DefaultDataPolicyContext policyContext = new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, Boolean.valueOf(true)).build();
            SaslAuthenticateRequestCallback requestCallback = new SaslAuthenticateRequestCallback(policyContext, "lkc-inconsequential");
            this.admin = (ConfluentAdmin)super.createAdminClient(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_SSL, "PLAIN", this.logicalCluster.adminUser().testPlainSaslJaasConfig(), clientProps, requestCallback, null);
            this.internalListenerAdmin = (ConfluentAdmin)super.createAdminClient(this.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.PLAINTEXT, null, null, new Properties());
        }

        private Properties brokerProps() {
            Properties props = new Properties();
            props.put("confluent.cluster.link.enable", "true");
            props.put("authorizer.class.name", MultiTenantAuthorizer.class.getName());
            props.put("confluent.plugins.topic.policy.replication.factor", "1");
            props.put("auto.create.topics.enable", "false");
            props.put("client.quota.callback.class", TenantQuotaCallback.class.getName());
            props.put("confluent.cdc.lkc.metadata.topic", "_confluent-logical_clusters");
            props.put("create.topic.policy.class.name", CreateTopicPolicy.class.getName());
            props.put("alter.config.policy.class.name", AlterConfigPolicy.class.getName());
            props.put("create.cluster.link.policy.class.name", CreateClusterLinkPolicy.class.getName());
            props.put("listener.name.internal.broker.interceptor.class", ConfluentCloudBrokerInterceptor.class.getName());
            props.put("confluent.cluster.link.metadata.topic.enable", "true");
            props.put("confluent.cluster.link.metadata.topic.replication.factor", "1");
            props.put("confluent.cluster.link.metadata.topic.partitions", "3");
            if (this.testRunsWithLinkCoordinator()) {
                props.put("confluent.cluster.link.metadata.topic.enable", "true");
                props.put("confluent.cluster.link.metadata.topic.partitions", "1");
                props.put("confluent.cluster.link.metadata.topic.replication.factor", "1");
                props.put("confluent.cluster.link.metadata.topic.min.isr", "1");
            }
            return props;
        }

        public void setPhysicalClusterMetadataReadyState(boolean isReady) {
            this.physicalClusterMetadata.setIsLkcMetadataReady(isReady);
        }

        public Config getBrokerDefaultConfig() {
            try {
                ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, "");
                return (Config)((KafkaFuture)this.internalListenerAdmin.describeConfigs(Collections.singleton(resource)).values().get(resource)).get(15L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void setBrokerDefaultConfig(Map<String, String> configEntries) {
            ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, "");
            ArrayList<AlterConfigOp> alterConfigOps = new ArrayList<AlterConfigOp>();
            for (Map.Entry<String, String> entry : configEntries.entrySet()) {
                alterConfigOps.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
            }
            try {
                this.internalListenerAdmin.incrementalAlterConfigs(Collections.singletonMap(resource, alterConfigOps)).all().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private boolean testRunsWithLinkCoordinator() {
            return this.testInfo.getDisplayName().contains("coordinator=true");
        }

        LogicalClusterUser createLinkUser(int newUserId) throws Exception {
            UserMetadata newUser = this.physicalCluster.getOrCreateUser(newUserId, false);
            LogicalClusterUser linkUser = this.logicalCluster.addUser(newUser);
            Set<AclBinding> acls = this.linkAcls(linkUser);
            this.addLinkAcls(acls);
            MultiTenantCluster.waitForAclsToExistOnAllBrokers(this.physicalCluster, acls, linkUser.tenantPrefix());
            return linkUser;
        }

        private static void waitForAclsToExistOnAllBrokers(PhysicalCluster cluster, Set<AclBinding> acls, String tenantPrefix) {
            kafka.utils.TestUtils.waitUntilTrue(() -> cluster.kafkaCluster().kafkaBrokers().stream().allMatch(b -> {
                if (b.authorizerPlugin().isEmpty()) {
                    return false;
                }
                Collection<AclBinding> currAcls = MultiTenantCluster.removeTenantPrefix(Lists.newArrayList((Iterable)((Authorizer)((Plugin)b.authorizerPlugin().get()).get()).acls(AclBindingFilter.ANY)), tenantPrefix);
                return currAcls.containsAll(acls);
            }), () -> "Failed to validate all ACLs exist on all brokers", (long)15000L, (long)100L);
        }

        private static Collection<AclBinding> removeTenantPrefix(Collection<AclBinding> acls, String tenantPrefix) {
            return acls.stream().map(acl -> {
                AccessControlEntry currEntry = acl.entry();
                if (currEntry.principal().contains(tenantPrefix)) {
                    String principal = currEntry.principal().replaceFirst(tenantPrefix, "").replaceFirst("TenantUser", "User");
                    ResourcePattern currPattern = acl.pattern();
                    ResourcePattern mappedPattern = currPattern.name().equals(tenantPrefix) && currPattern.patternType().equals((Object)PatternType.PREFIXED) ? new ResourcePattern(currPattern.resourceType(), "*", PatternType.LITERAL) : new ResourcePattern(currPattern.resourceType(), currPattern.name().replaceFirst(tenantPrefix, ""), currPattern.patternType());
                    return new AclBinding(mappedPattern, new AccessControlEntry(principal, currEntry.host(), currEntry.operation(), currEntry.permissionType(), currEntry.clusterLinkIds()));
                }
                return new AclBinding(acl.pattern(), acl.entry());
            }).collect(Collectors.toList());
        }

        private Map<String, String> clientConfigsForExternalListener() {
            Properties props = KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(), SecurityProtocol.SASL_SSL, "PLAIN", this.linkUser.saslJaasConfig());
            HashMap<String, String> clientConfigs = new HashMap<String, String>();
            props.stringPropertyNames().forEach(name -> clientConfigs.put((String)name, props.getProperty((String)name)));
            clientConfigs.putAll(this.sslConfigs);
            return clientConfigs;
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId) throws Throwable {
            return this.createDestClusterLinkResult(admin, linkName, sourceCluster, linkUserId, Collections.emptyMap());
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, Map<String, String> linkConfigOverride) throws Throwable {
            HashMap<String, String> linkConfigs = new HashMap<String, String>();
            sourceCluster.linkUser = sourceCluster.createLinkUser(linkUserId);
            linkConfigs.putAll(sourceCluster.clientConfigsForExternalListener());
            linkConfigs.put("request.timeout.ms", "1000");
            linkConfigs.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            String allGroupsFilter = "{ \"groupFilters\": [{ \"name\": \"*\", \"patternType\": \"literal\", \"filterType\": \"include\" }]}";
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), allGroupsFilter);
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "2000");
            linkConfigs.putAll(linkConfigOverride);
            String sourceClusterId = sourceCluster.logicalCluster.logicalClusterId();
            NewClusterLink newClusterLink = new NewClusterLink(linkName, sourceClusterId, linkConfigs, null);
            CreateClusterLinksOptions options = new CreateClusterLinksOptions().validateOnly(false).validateLink(true);
            return admin.createClusterLinks(Collections.singleton(newClusterLink), options);
        }

        KafkaProducer<String, String> getOrCreateProducerToInternalListener() {
            if (this.producerToInternalListener == null) {
                Properties clientProps = new Properties();
                clientProps.putAll(this.sslConfigs);
                this.producerToInternalListener = this.createProducer(SecurityProtocol.PLAINTEXT, "", "", Optional.of("INTERNAL"), clientProps);
            }
            return this.producerToInternalListener;
        }

        KafkaConsumer<String, String> getOrCreateConsumerToExternalListener(String groupId) {
            if (this.consumerToExternalListener == null) {
                Properties clientProps = new Properties();
                clientProps.putAll(this.sslConfigs);
                this.consumerToExternalListener = this.createConsumer(this.user, groupId, SecurityProtocol.SASL_SSL, "PLAIN", Optional.empty(), clientProps);
            }
            return this.consumerToExternalListener;
        }

        private void addLinkAcls(Set<AclBinding> acls) throws Exception {
            this.admin.createAcls(acls).all().get(15L, TimeUnit.SECONDS);
        }

        private Set<AclBinding> linkAcls(LogicalClusterUser user) {
            String principal = user.unprefixedKafkaPrincipal().toString();
            AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
            AclBinding topicConfigAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW));
            AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
            AclBinding clusterDescribeConfigsAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW));
            AclBinding groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
            Set<AclBinding> acls = Set.of(topicAcl, topicConfigAcl, groupAcl, clusterAcl, clusterDescribeConfigsAcl);
            return acls;
        }

        ClusterLinkFactory.LinkManager linkManager() {
            return this.physicalCluster.kafkaCluster().kafkaBrokers().get(0).clusterLinkManager();
        }

        boolean linkIdExists(String linkName) {
            ClusterLinkFactory.LinkManager linkManager = this.linkManager();
            return linkManager.listClusterLinks().find(d -> d.linkName().equals(this.user.tenantPrefix() + linkName)).isDefined();
        }

        ClusterLinkListing listClusterLinks(String linkName) {
            try {
                ListClusterLinksOptions options = new ListClusterLinksOptions().linkNames(Optional.of(Collections.singletonList(linkName)));
                Collection clusterLinks = (Collection)this.admin.listClusterLinks(options).result().get(15L, TimeUnit.SECONDS);
                return clusterLinks.stream().findFirst().orElse(null);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        ClusterLinkDescription linkDescription(String linkName) {
            try {
                DescribeClusterLinksOptions options = new DescribeClusterLinksOptions().linkNames(Collections.singleton(linkName));
                Collection descriptions = (Collection)this.admin.describeClusterLinks(options).result().get(15L, TimeUnit.SECONDS);
                return descriptions.stream().findFirst().orElse(null);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void verifyDefaultDataPolicyCrossOrgDeniedMetrics(boolean isPositiveValue) {
            Double maxTotalDenied = 0.0;
            for (KafkaBroker kafkaBroker : this.physicalCluster.kafkaCluster().kafkaBrokers()) {
                MetricName totalDeniedMetricName = DefaultDataPolicyStore.crossOrgDeniedTotalMetricName((String)kafkaBroker.config().brokerSessionUuid());
                Double totalDeniedMetricValue = (Double)this.getMetricValue(kafkaBroker, totalDeniedMetricName);
                Assertions.assertNotNull((Object)totalDeniedMetricValue);
                maxTotalDenied = Math.max(maxTotalDenied, totalDeniedMetricValue);
            }
            if (isPositiveValue) {
                Assertions.assertTrue((maxTotalDenied > 0.0 ? 1 : 0) != 0);
            } else {
                Assertions.assertEquals((double)0.0, (Double)maxTotalDenied);
            }
        }

        public void verifyDefaultDataPolicyEnforcementMetric(Integer expectedValue) {
            for (KafkaBroker kafkaBroker : this.physicalCluster.kafkaCluster().kafkaBrokers()) {
                MetricName metricName = DefaultDataPolicyStore.defaultDataPolicyEnforcementMetricName((String)kafkaBroker.config().brokerSessionUuid());
                Integer enforcementMetricValue = (Integer)this.getMetricValue(kafkaBroker, metricName);
                Assertions.assertNotNull((Object)enforcementMetricValue);
                Assertions.assertEquals((Integer)expectedValue, (Integer)enforcementMetricValue);
            }
        }

        private Object getMetricValue(KafkaBroker kafkaBroker, MetricName metricName) {
            for (Map.Entry entry : kafkaBroker.metrics().metrics().entrySet()) {
                if (!((MetricName)entry.getKey()).equals((Object)metricName)) continue;
                return ((KafkaMetric)entry.getValue()).metricValue();
            }
            return null;
        }

        Map<TopicPartition, OffsetAndMetadata> committedOffsets(String group) {
            try {
                return (Map)this.admin.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

