/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.UserMetadata;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.admin.AclCommand;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.ZkAdminManager;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateClusterLinksOptions;
import org.apache.kafka.clients.admin.ListClusterLinksOptions;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.NewTopicMirror;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Map;

@Category(value={IntegrationTest.class})
public class MultiTenantClusterLinkTest {
    private final MultiTenantCluster sourceCluster = new MultiTenantCluster();
    private final MultiTenantCluster destCluster = new MultiTenantCluster();
    private final String linkName = "tenantLink";
    private final String topic = "linkedTopic";
    private int numPartitions = 2;
    private int nextMessageIndex = 0;

    @Before
    public void setUp() throws Exception {
        this.sourceCluster.startCluster(this.brokerProps(), "sourceLogicalCluster", 1);
        this.destCluster.startCluster(this.brokerProps(), "destLogicalCluster", 11);
        this.addAcls((Admin)this.destCluster.admin, this.destCluster.user);
        this.addAcls((Admin)this.sourceCluster.admin, this.sourceCluster.user);
    }

    @After
    public void tearDown() throws Exception {
        this.sourceCluster.shutdown();
        this.destCluster.shutdown();
    }

    @Test
    public void testMultiTenantClusterLink() throws Throwable {
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001);
        UUID linkId = this.destCluster.linkId("tenantLink");
        this.createMirroredTopic();
        this.verifyTopicListing();
        this.verifyTopicMirroring();
        this.addSourcePartitionsAndVerifyMirror(4);
        this.changeSourceTopicConfigAndVerifyMirror();
        this.verifyAclAndOffsetMigration();
        this.verifyTopicMirroring();
        this.verifyTenantMetrics(linkId, 1001);
        this.verifyMetricsGroups(linkId);
        this.stopMirroring("linkedTopic");
    }

    @Test
    public void testClusterLinkSecurityUpdate() throws Throwable {
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001);
        this.createMirroredTopic();
        this.verifyTopicListing();
        this.verifyTopicMirroring();
        this.verifyAclAndOffsetMigration();
        LogicalClusterUser newUser = this.sourceCluster.createLinkUser(2001);
        String newJaasConfig = newUser.saslJaasConfig();
        this.destCluster.alterClusterLink("tenantLink", "sasl.jaas.config", newJaasConfig);
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = newUser;
        this.verifyTopicMirroring();
        this.verifyAclAndOffsetMigration();
        this.verifySocketBufferSizeUpdate();
        this.addSourcePartitionsAndVerifyMirror(4);
        this.changeSourceTopicConfigAndVerifyMirror();
        this.verifyTopicMirroring();
    }

    @Test
    public void testSourceConnectionOrigination() throws Throwable {
        this.destCluster.useSourceConnectionOrigination = true;
        this.sourceCluster.useSourceConnectionOrigination = true;
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1);
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003);
        this.createMirroredTopic();
        this.verifyTopicListing();
        this.verifyTopicMirroring();
        this.verifyMetricsGroups(this.destCluster.linkId("tenantLink"));
        this.verifyTenantMetrics(this.destCluster.linkId("tenantLink"), 1003);
        this.verifyAclAndOffsetMigration();
        LogicalClusterUser newUser = this.sourceCluster.createLinkUser(2001);
        String newJaasConfig = newUser.saslJaasConfig();
        this.sourceCluster.alterClusterLink("tenantLink", ClusterLinkConfig.LocalPrefix() + "sasl.jaas.config", newJaasConfig);
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = newUser;
        this.verifyTopicMirroring();
        this.verifyAclAndOffsetMigration();
        this.verifySocketBufferSizeUpdate();
        this.addSourcePartitionsAndVerifyMirror(4);
        this.changeSourceTopicConfigAndVerifyMirror();
        this.verifyTopicMirroring();
    }

    private Properties brokerProps() {
        Properties props = new Properties();
        props.put("confluent.cluster.link.enable", "true");
        props.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        props.put("confluent.plugins.topic.policy.replication.factor", "1");
        props.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        props.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        props.put(KafkaConfig.PasswordEncoderSecretProp(), "multi-tenant-cluster-link-secret");
        props.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        return props;
    }

    private void createMirroredTopic() throws Exception {
        this.sourceCluster.physicalCluster.kafkaCluster().createTopic(this.sourceCluster.user.tenantPrefix() + "linkedTopic", this.numPartitions, 1);
        NewTopic newTopic = new NewTopic("linkedTopic", Optional.empty(), Optional.of((short)1)).mirror(Optional.of(new NewTopicMirror("tenantLink", "linkedTopic")));
        this.destCluster.admin.createTopics(Collections.singleton(newTopic)).all().get();
    }

    private void stopMirroring(String topic) throws Throwable {
        this.destCluster.admin.alterMirrors(Collections.singletonMap(topic, AlterMirrorOp.FAILOVER), new AlterMirrorsOptions()).all().get(15L, TimeUnit.SECONDS);
    }

    private void verifyTopicListing() throws Exception {
        ListClusterLinksOptions options = new ListClusterLinksOptions().includeTopics(true);
        Collection listings = (Collection)this.destCluster.admin.listClusterLinks(options).result().get();
        Assert.assertEquals((long)1L, (long)listings.size());
        ClusterLinkListing listing = (ClusterLinkListing)listings.iterator().next();
        Assert.assertEquals((Object)"tenantLink", (Object)listing.linkName());
        Assert.assertTrue((boolean)listing.topics().isPresent());
        Collection topics = (Collection)listing.topics().get();
        Assert.assertEquals((long)1L, (long)topics.size());
        Assert.assertEquals((Object)"linkedTopic", topics.iterator().next());
    }

    private void verifyTopicMirroring() throws Throwable {
        int firstMessageIndex = this.nextMessageIndex;
        int messageCount = 10;
        this.nextMessageIndex += messageCount;
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(), "linkedTopic", firstMessageIndex, messageCount);
        KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), "linkedTopic", firstMessageIndex, messageCount);
    }

    private void verifyAclAndOffsetMigration() throws Throwable {
        Set<AclBinding> acls = this.addAcls((Admin)this.sourceCluster.admin, this.sourceCluster.user);
        this.addBrokerAclsForOffsetMigration();
        String group = "linkedGroup";
        java.util.Map<TopicPartition, OffsetAndMetadata> offsets = this.commitOffsets((Admin)this.sourceCluster.admin, group);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.describeAcls(this.sourceCluster.user), acls, "Acls not migrated");
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.committedOffsets(group), offsets, "Consumer offsets not migrated");
    }

    private java.util.Map<String, String> linkMetricTags(String tenant, String request) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("tenant", tenant);
        tags.put("link-name", "tenantLink");
        tags.put("request", request);
        return tags;
    }

    private java.util.Map<String, String> linkIdMetricTags(UUID linkId, String tenant, String request) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("tenant", tenant);
        tags.put("link-id", linkId.toString());
        tags.put("request", request);
        return tags;
    }

    private void verifyTenantMetrics(UUID linkId, int linkUserId) {
        String destMetricsGroup = "cluster-link-dest-tenant-metrics";
        HashMap<String, String> sourceTags = new HashMap<String, String>();
        sourceTags.put("tenant", "sourceLogicalCluster");
        sourceTags.put("user", String.valueOf(linkUserId));
        sourceTags.put("request", "Fetch");
        double sourceRequests = this.metricValue(this.sourceCluster, "tenant-metrics", "request-total", sourceTags);
        Assert.assertNotEquals((double)0.0, (double)sourceRequests, (double)0.001);
        double sourceRequestBytes = this.metricValue(this.sourceCluster, "tenant-metrics", "request-byte-total", sourceTags);
        Assert.assertNotEquals((double)0.0, (double)sourceRequestBytes, (double)0.001);
        double sourceResponseBytes = this.metricValue(this.sourceCluster, "tenant-metrics", "response-byte-total", sourceTags);
        Assert.assertNotEquals((double)0.0, (double)sourceResponseBytes, (double)0.001);
        java.util.Map<String, String> destTags = this.linkMetricTags("destLogicalCluster", "Fetch");
        this.assertRange("requests", this.metricValue(this.sourceCluster, "tenant-metrics", "request-total", sourceTags), this.metricValue(this.destCluster, destMetricsGroup, "request-total", destTags), 10.0);
        this.assertRange("request-bytes", this.metricValue(this.sourceCluster, "tenant-metrics", "request-byte-total", sourceTags), this.metricValue(this.destCluster, destMetricsGroup, "request-byte-total", destTags), 2000.0);
        this.assertRange("response-bytes", this.metricValue(this.destCluster, destMetricsGroup, "response-byte-total", destTags), this.metricValue(this.sourceCluster, "tenant-metrics", "response-byte-total", sourceTags), 2000.0);
        double responseTimeMs = TimeUnit.NANOSECONDS.toMillis((long)this.metricValue(this.destCluster, destMetricsGroup, "response-time-ns-max", destTags));
        Assert.assertTrue((String)("Invalid response time metric: " + responseTimeMs), (responseTimeMs > 0.0 && responseTimeMs < 15000.0 ? 1 : 0) != 0);
        HashMap<String, String> linkTags = new HashMap<String, String>();
        linkTags.put("tenant", "destLogicalCluster");
        linkTags.put("link-name", "tenantLink");
        Assert.assertEquals((double)1.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "link-count", linkTags), (double)0.001);
        Assert.assertEquals((double)1.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "global-active-mirror-topic-count", linkTags), (double)0.001);
        Assert.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "global-failed-mirror-topic-count", linkTags), (double)0.001);
        Assert.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "global-stopped-mirror-topic-count", linkTags), (double)0.001);
        Assert.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "global-paused-mirror-topic-count", linkTags), (double)0.001);
        Assert.assertEquals((double)this.numPartitions, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-partition-count", linkTags), (double)0.001);
        Assert.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "failed-mirror-partition-count", linkTags), (double)0.001);
        String sourceMetricsGroup = "cluster-link-source-metrics";
        java.util.Map<String, String> sourceLinkTags = this.linkIdMetricTags(linkId, "sourceLogicalCluster", "Fetch");
        this.assertRange("requests", this.metricValue(this.sourceCluster, sourceMetricsGroup, "request-total", sourceLinkTags), this.metricValue(this.destCluster, destMetricsGroup, "request-total", destTags), 10.0);
        this.assertRange("request-bytes", this.metricValue(this.sourceCluster, sourceMetricsGroup, "request-byte-total", sourceLinkTags), this.metricValue(this.destCluster, destMetricsGroup, "request-byte-total", destTags), 2000.0);
        this.assertRange("response-bytes", this.metricValue(this.destCluster, destMetricsGroup, "response-byte-total", destTags), this.metricValue(this.sourceCluster, sourceMetricsGroup, "response-byte-total", sourceLinkTags), 2000.0);
        responseTimeMs = TimeUnit.NANOSECONDS.toMillis((long)this.metricValue(this.sourceCluster, sourceMetricsGroup, "response-time-ns-max", sourceLinkTags));
        Assert.assertTrue((String)("Invalid source link response time metric: " + responseTimeMs), (responseTimeMs > 0.0 && responseTimeMs < 15000.0 ? 1 : 0) != 0);
    }

    private void verifyMetricsGroups(UUID linkId) {
        double sourceValue;
        java.util.Map<String, String> destTags = this.linkMetricTags("destLogicalCluster", "Metadata");
        java.util.Map<String, String> sourceTags = this.linkMetricTags("sourceLogicalCluster", "Metadata");
        Assert.assertEquals((double)0.0, (double)this.metricValue(this.sourceCluster, "cluster-link-dest-tenant-metrics", "request-total", sourceTags, false), (double)0.001);
        Assert.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-source-tenant-metrics", "request-total", destTags, false), (double)0.001);
        double destValue = this.metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "request-total", destTags);
        Assert.assertTrue((String)("Dest metric not updated: " + destValue), (destValue > 0.0 ? 1 : 0) != 0);
        if (this.sourceCluster.useSourceConnectionOrigination) {
            sourceValue = this.metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", "request-total", sourceTags);
            Assert.assertTrue((String)("Source metric not updated: " + sourceValue), (sourceValue > 0.0 ? 1 : 0) != 0);
        } else {
            Assert.assertEquals((double)0.0, (double)this.metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", "request-total", sourceTags, false), (double)0.001);
        }
        destTags = this.linkIdMetricTags(linkId, "destLogicalCluster", "Metadata");
        sourceTags = this.linkIdMetricTags(linkId, "sourceLogicalCluster", "Metadata");
        sourceValue = this.metricValue(this.sourceCluster, "cluster-link-source-metrics", "request-total", sourceTags);
        Assert.assertTrue((String)("Source metric not updated: " + sourceValue), (sourceValue > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-source-metrics", "request-total", destTags, false), (double)0.001);
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("link-id", linkId.toString());
        tags.put("tenant", "sourceLogicalCluster");
        tags.put("mode", "source");
        Assert.assertEquals((long)this.sourceCluster.physicalCluster.kafkaCluster().brokers().size(), (long)((int)this.totalMetricValue(this.sourceCluster, "cluster-link-metrics", "active-link-count", tags)));
        tags.put("tenant", "destLogicalCluster");
        tags.put("mode", "destination");
        Assert.assertEquals((long)this.destCluster.physicalCluster.kafkaCluster().brokers().size(), (long)((int)this.totalMetricValue(this.destCluster, "cluster-link-metrics", "active-link-count", tags)));
    }

    private void verifySocketBufferSizeUpdate() throws Exception {
        long invalidSize = 0x200000L;
        ConfigResource linkResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        ExecutionException e1 = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(linkResource, Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(invalidSize)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        });
        Assert.assertEquals(PolicyViolationException.class, e1.getCause().getClass());
        ExecutionException e2 = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(linkResource, Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(1024)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        });
        Assert.assertEquals(PolicyViolationException.class, e2.getCause().getClass());
        long validSize = 131072L;
        this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(validSize)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkConfig("tenantLink", KafkaConfig.ReplicaSocketReceiveBufferBytesProp()), String.valueOf(validSize), "Link config not updated");
    }

    private void assertRange(String name, double receiverValue, double senderValue, double maxDiff) {
        String message = String.format("Metric values for '%s' (%f, %f) not within expected range %f", name, receiverValue, senderValue, maxDiff);
        Assert.assertTrue((String)message, (senderValue - receiverValue <= maxDiff && receiverValue - senderValue < 0.001 ? 1 : 0) != 0);
    }

    private double metricValue(MultiTenantCluster cluster, String group, String name, java.util.Map<String, String> tags) {
        return this.metricValue(cluster, group, name, tags, true);
    }

    private double metricValue(MultiTenantCluster cluster, String group, String name, java.util.Map<String, String> tags, boolean failIfNotFound) {
        double value = 0.0;
        boolean found = false;
        for (KafkaServer server : cluster.physicalCluster.kafkaCluster().brokers()) {
            for (Map.Entry entry : server.metrics().metrics().entrySet()) {
                if (!this.isMatchingMetric((MetricName)entry.getKey(), name, group, tags)) continue;
                found = true;
                value += ((Double)((KafkaMetric)entry.getValue()).metricValue()).doubleValue();
            }
        }
        if (failIfNotFound) {
            Assert.assertTrue((String)("Metric not found " + name), (boolean)found);
        }
        return value;
    }

    private double totalMetricValue(MultiTenantCluster cluster, String group, String name, java.util.Map<String, String> tags) {
        double value = 0.0;
        for (KafkaServer server : cluster.physicalCluster.kafkaCluster().brokers()) {
            for (Map.Entry entry : server.metrics().metrics().entrySet()) {
                if (!this.isMatchingMetric((MetricName)entry.getKey(), name, group, tags)) continue;
                value += ((Double)((KafkaMetric)entry.getValue()).metricValue()).doubleValue();
            }
        }
        return value;
    }

    private boolean isMatchingMetric(MetricName metricName, String name, String group, java.util.Map<String, String> tags) {
        if (!metricName.name().equals(name)) {
            return false;
        }
        if (!metricName.group().equals(group)) {
            return false;
        }
        java.util.Map metricTags = metricName.tags();
        for (Map.Entry<String, String> entry : tags.entrySet()) {
            if (entry.getValue().equals(metricTags.get(entry.getKey()))) continue;
            return false;
        }
        return true;
    }

    private void addSourcePartitionsAndVerifyMirror(int newPartitionCount) throws Exception {
        ((KafkaFuture)this.sourceCluster.admin.createPartitions(Collections.singletonMap("linkedTopic", NewPartitions.increaseTo((int)newPartitionCount))).values().get("linkedTopic")).get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.partitionsForTopic("linkedTopic"), newPartitionCount, "Topic partitions not updated");
        this.numPartitions = newPartitionCount;
    }

    private void changeSourceTopicConfigAndVerifyMirror() throws Exception {
        this.sourceCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "linkedTopic"), Collections.singleton(new AlterConfigOp(new ConfigEntry("max.message.bytes", "123456"), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.topicConfig("linkedTopic", "max.message.bytes"), "123456", "Topic configs not migrated");
    }

    private Set<AclBinding> addAcls(Admin adminClient, LogicalClusterUser user) throws Exception {
        String principal = user.unprefixedKafkaPrincipal().toString();
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        AclBinding groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
        Set acls = Utils.mkSet((Object[])new AclBinding[]{topicAcl, groupAcl, clusterAcl});
        adminClient.createAcls((Collection)acls).all().get(15L, TimeUnit.SECONDS);
        return acls;
    }

    private void addBrokerAclsForOffsetMigration() throws Exception {
        String zkConnect = this.destCluster.physicalCluster.kafkaCluster().zkConnect();
        String prefix = this.destCluster.user.tenantPrefix() + "linked";
        String[] aclArgs = SecurityTestUtils.consumeAclArgs(zkConnect, PhysicalCluster.BROKER_PRINCIPAL, prefix, prefix, PatternType.PREFIXED);
        AclCommand.main((String[])aclArgs);
    }

    private java.util.Map<TopicPartition, OffsetAndMetadata> commitOffsets(Admin adminClient, String group) throws Exception {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        LogManager logManager = this.sourceCluster.physicalCluster.kafkaCluster().brokers().get(0).logManager();
        for (int i = 0; i < this.numPartitions; ++i) {
            TopicPartition tp = new TopicPartition("linkedTopic", i);
            TopicPartition prefixedPartition = new TopicPartition(this.sourceCluster.user.tenantPrefix() + "linkedTopic", i);
            offsets.put(tp, new OffsetAndMetadata(((AbstractLog)logManager.getLog(prefixedPartition, false).get()).localLogEndOffset()));
        }
        adminClient.alterConsumerGroupOffsets(group, offsets).all().get();
        return offsets;
    }

    private static <T> void waitFor(Supplier<T> actual, T expected, String error) throws Exception {
        TestUtils.waitForCondition(() -> expected.equals(actual.get()), () -> error + " : expected=" + expected + ", actual=" + actual.get());
    }

    private static class MultiTenantCluster
    extends IntegrationTestHarness {
        private PhysicalCluster physicalCluster;
        private LogicalCluster logicalCluster;
        private LogicalClusterUser user;
        private LogicalClusterUser linkUser;
        private ConfluentAdmin admin;
        private boolean useSourceConnectionOrigination;
        KafkaProducer<String, String> producer;
        KafkaConsumer<String, String> consumer;

        private MultiTenantCluster() {
        }

        void startCluster(Properties brokerOverrideProps, String logicalClusterId, int userId) throws Exception {
            this.physicalCluster = this.start(brokerOverrideProps, Optional.of(Time.SYSTEM), PhysicalCluster::addBrokerAcls);
            this.logicalCluster = this.physicalCluster.createLogicalCluster(logicalClusterId, 100, userId);
            this.user = this.logicalCluster.user(userId);
            this.admin = (ConfluentAdmin)super.createAdminClient(this.logicalCluster.adminUser());
            this.addAclsForAdminUser();
        }

        LogicalClusterUser createLinkUser(int newUserId) throws Exception {
            UserMetadata newUser = this.physicalCluster.getOrCreateUser(newUserId, false);
            LogicalClusterUser linkUser = this.logicalCluster.addUser(newUser);
            this.addLinkAcls(linkUser);
            if (this.useSourceConnectionOrigination) {
                this.addReverseConnectionAcls(linkUser);
            }
            return linkUser;
        }

        void deleteUser(LogicalClusterUser user, boolean deleteAcls) {
            this.logicalCluster.removeUser(user.userMetadata.userId());
            if (deleteAcls) {
                this.deleteAcls(user);
            }
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
            }
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
        }

        void createDestClusterLink(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId) throws Throwable {
            HashMap<String, String> linkConfigs = new HashMap<String, String>();
            if (!this.useSourceConnectionOrigination) {
                sourceCluster.linkUser = sourceCluster.createLinkUser(linkUserId);
                Properties sourceClientProps = KafkaTestUtils.securityProps(sourceCluster.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), sourceCluster.linkUser.saslJaasConfig());
                sourceClientProps.stringPropertyNames().forEach(name -> linkConfigs.put((String)name, sourceClientProps.getProperty((String)name)));
            } else {
                linkConfigs.put(ClusterLinkConfig.LinkModeProp(), "DESTINATION");
                linkConfigs.put(ClusterLinkConfig.ConnectionModeProp(), "INBOUND");
            }
            linkConfigs.put("request.timeout.ms", "10000");
            linkConfigs.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            String allAclsFilter = "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\" }}]}";
            linkConfigs.put(ClusterLinkConfig.AclFiltersProp(), allAclsFilter);
            linkConfigs.put(ClusterLinkConfig.AclSyncEnableProp(), "true");
            linkConfigs.put(ClusterLinkConfig.AclSyncMsProp(), "2000");
            String allGroupsFilter = "{ \"groupFilters\": [{ \"name\": \"*\", \"patternType\": \"literal\", \"filterType\": \"include\" }]}";
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), allGroupsFilter);
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "2000");
            NewClusterLink newClusterLink = new NewClusterLink(linkName, sourceCluster.logicalCluster.logicalClusterId(), linkConfigs);
            CreateClusterLinksOptions options = new CreateClusterLinksOptions().validateOnly(false).validateLink(true);
            admin.createClusterLinks(Collections.singleton(newClusterLink), options).all().get();
            this.setInternalClusterLinkConfigs(linkName, Collections.singletonMap("metadata.max.age.ms", "1000"));
        }

        void createSourceClusterLink(ConfluentAdmin admin, String linkName, MultiTenantCluster destCluster, int linkUserId) throws Throwable {
            Assert.assertTrue((boolean)this.useSourceConnectionOrigination);
            destCluster.linkUser = destCluster.createLinkUser(1002);
            this.linkUser = this.createLinkUser(linkUserId);
            Properties destClientProps = KafkaTestUtils.securityProps(destCluster.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), destCluster.linkUser.saslJaasConfig());
            Properties sourceClientProps = KafkaTestUtils.securityProps(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig());
            HashMap<String, String> linkConfigs = new HashMap<String, String>();
            linkConfigs.put(ClusterLinkConfig.LinkModeProp(), "SOURCE");
            linkConfigs.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
            linkConfigs.put(ClusterLinkConfig.LocalListenerNameProp(), "EXTERNAL");
            destClientProps.stringPropertyNames().forEach(name -> linkConfigs.put((String)name, destClientProps.getProperty((String)name)));
            sourceClientProps.stringPropertyNames().forEach(name -> linkConfigs.put(ClusterLinkConfig.LocalPrefix() + name, sourceClientProps.getProperty((String)name)));
            linkConfigs.put("request.timeout.ms", "10000");
            linkConfigs.put("metadata.max.age.ms", "10000");
            linkConfigs.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            NewClusterLink newClusterLink = new NewClusterLink(linkName, destCluster.logicalCluster.logicalClusterId(), linkConfigs);
            CreateClusterLinksOptions options = new CreateClusterLinksOptions().validateOnly(false).validateLink(true);
            admin.createClusterLinks(Collections.singleton(newClusterLink), options).all().get();
        }

        void setInternalClusterLinkConfigs(String linkName, java.util.Map<String, String> configs) throws Exception {
            String prefixedLinkName = this.user.tenantPrefix() + linkName;
            List<ClusterLinkFactory.ClientManager> clientManagers = this.waitForClientManagers(prefixedLinkName);
            List<Admin> admins = this.waitForAdmins(clientManagers, Collections.emptyList());
            ZkAdminManager adminManager = this.physicalCluster.kafkaCluster().brokers().get(0).adminManager();
            ConfigResource resource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, prefixedLinkName);
            ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>(configs.size());
            for (Map.Entry<String, String> entry : configs.entrySet()) {
                ops.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
            }
            scala.collection.mutable.Map scalaConfigs = JavaConverters.mapAsScalaMap(Collections.singletonMap(resource, JavaConverters.asScalaBuffer(ops).toSeq()));
            java.util.Map result = JavaConverters.mapAsJavaMap((Map)adminManager.incrementalAlterConfigs((Map)scalaConfigs, false, this.user.unprefixedKafkaPrincipal()));
            Assert.assertEquals((Object)ApiError.NONE, result.get(resource));
            for (ClusterLinkFactory.ClientManager clientManager : clientManagers) {
                for (Map.Entry<String, String> e : configs.entrySet()) {
                    MultiTenantClusterLinkTest.waitFor(() -> this.linkConfig(clientManager, prefixedLinkName, (String)e.getKey()), e.getValue(), "Link config not propagated: " + e.getKey());
                }
            }
            this.waitForAdmins(clientManagers, admins);
        }

        void alterClusterLink(String linkName, String configName, String configValue) throws Exception {
            this.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, linkName), Collections.singleton(new AlterConfigOp(new ConfigEntry(configName, configValue), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
            MultiTenantClusterLinkTest.waitFor(() -> this.linkConfig(linkName, configName), configValue, "Link config not updated");
        }

        private List<ClusterLinkFactory.ClientManager> waitForClientManagers(String linkName) throws Exception {
            TestUtils.waitForCondition(() -> {
                for (KafkaServer server : this.physicalCluster.kafkaCluster().brokers()) {
                    if (this.clusterLinkClientManager(server, linkName).isPresent()) continue;
                    return false;
                }
                return true;
            }, (String)"Cluster link client managers not created");
            return this.physicalCluster.kafkaCluster().brokers().stream().map(server -> this.clusterLinkClientManager((KafkaServer)server, linkName).get()).collect(Collectors.toList());
        }

        private Optional<ClusterLinkFactory.ClientManager> clusterLinkClientManager(KafkaServer server, String linkName) {
            ClusterLinkFactory.LinkManager linkManager = server.clusterLinkManager();
            Option link = linkManager.listClusterLinks().find(l -> l.linkName().equals(linkName));
            if (link.isEmpty()) {
                return Optional.empty();
            }
            return Optional.ofNullable(linkManager.clientManager(((ClusterLinkData)link.get()).linkId()).getOrElse(() -> null));
        }

        private String linkConfig(ClusterLinkFactory.ClientManager clientManager, String linkName, String configName) {
            return (String)clientManager.currentConfig().originalsStrings().get(configName);
        }

        private List<Admin> waitForAdmins(List<ClusterLinkFactory.ClientManager> clientManagers, List<Admin> oldAdmins) throws Exception {
            ArrayList<Admin> admins = new ArrayList<Admin>();
            TestUtils.waitForCondition(() -> {
                for (ClusterLinkFactory.ClientManager clientManager : clientManagers) {
                    admins.clear();
                    ConfluentAdmin admin = ((ClusterLinkClientManager)clientManager).getAdmin();
                    if (admin == null) {
                        return false;
                    }
                    if (oldAdmins.stream().anyMatch(arg_0 -> MultiTenantCluster.lambda$null$9((Admin)admin, arg_0))) {
                        return false;
                    }
                    admins.add((Admin)admin);
                }
                return true;
            }, (String)"Admin clients not created");
            return admins;
        }

        KafkaProducer<String, String> getOrCreateProducer() {
            if (this.producer == null) {
                this.producer = this.createProducer(this.user, SecurityProtocol.SASL_PLAINTEXT);
            }
            return this.producer;
        }

        KafkaConsumer<String, String> getOrCreateConsumer(String groupId) {
            if (this.consumer == null) {
                this.consumer = this.createConsumer(this.user, groupId, SecurityProtocol.SASL_PLAINTEXT);
            }
            return this.consumer;
        }

        Set<AclBinding> describeAcls(LogicalClusterUser user) {
            try {
                return new HashSet<AclBinding>((Collection)this.admin.describeAcls(this.aclBindingFilter(user)).values().get(15L, TimeUnit.SECONDS));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        void deleteAcls(LogicalClusterUser user) {
            try {
                this.admin.deleteAcls(Collections.singleton(this.aclBindingFilter(user))).all().get(15L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private AclBindingFilter aclBindingFilter(LogicalClusterUser user) {
            AccessControlEntryFilter aceFilter = new AccessControlEntryFilter(user.unprefixedKafkaPrincipal().toString(), null, AclOperation.ANY, AclPermissionType.ANY);
            return new AclBindingFilter(ResourcePatternFilter.ANY, aceFilter);
        }

        private void addAclsForAdminUser() throws Exception {
            String[] aclArgs = SecurityTestUtils.clusterAclArgs(this.physicalCluster.kafkaCluster().zkConnect(), this.user.prefixedKafkaPrincipal(), "All");
            AclCommand.main((String[])aclArgs);
        }

        private void addLinkAcls(LogicalClusterUser user) throws Exception {
            String principal = user.unprefixedKafkaPrincipal().toString();
            AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
            AclBinding topicConfigAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW));
            AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
            AclBinding groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
            Set acls = Utils.mkSet((Object[])new AclBinding[]{topicAcl, topicConfigAcl, groupAcl, clusterAcl});
            this.admin.createAcls((Collection)acls).all().get(15L, TimeUnit.SECONDS);
        }

        private void addReverseConnectionAcls(LogicalClusterUser user) throws Exception {
            String principal = user.unprefixedKafkaPrincipal().toString();
            AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.ALTER, AclPermissionType.ALLOW));
            Set acls = Utils.mkSet((Object[])new AclBinding[]{clusterAcl});
            this.admin.createAcls((Collection)acls).all().get(15L, TimeUnit.SECONDS);
        }

        int partitionsForTopic(String topic) {
            try {
                TopicDescription topicDesc = (TopicDescription)((KafkaFuture)this.admin.describeTopics(Collections.singleton(topic)).values().get(topic)).get(15L, TimeUnit.SECONDS);
                return topicDesc.partitions().size();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        String topicConfig(String topic, String name) {
            try {
                ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
                Config config = (Config)((KafkaFuture)this.admin.describeConfigs(Collections.singleton(resource)).values().get(resource)).get(15L, TimeUnit.SECONDS);
                return config.entries().stream().filter(e -> e.name().equals(name)).findFirst().map(ConfigEntry::value).get();
            }
            catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }

        String linkConfig(String linkName, String configName) {
            ClusterLinkFactory.LinkManager linkManager = this.physicalCluster.kafkaCluster().brokers().get(0).clusterLinkManager();
            UUID linkId = this.linkId(linkName);
            ClusterLinkFactory.ConnectionManager connManager = (ClusterLinkFactory.ConnectionManager)linkManager.connectionManager(linkId).get();
            return (String)connManager.currentConfig().originalsStrings().get(configName);
        }

        UUID linkId(String linkName) {
            ClusterLinkFactory.LinkManager linkManager = this.physicalCluster.kafkaCluster().brokers().get(0).clusterLinkManager();
            return ((ClusterLinkData)linkManager.listClusterLinks().find(d -> d.linkName().equals(this.user.tenantPrefix() + linkName)).get()).linkId();
        }

        java.util.Map<TopicPartition, OffsetAndMetadata> committedOffsets(String group) {
            try {
                return (java.util.Map)this.admin.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private static /* synthetic */ boolean lambda$null$9(Admin admin, Admin oldAdmin) {
            return oldAdmin == admin;
        }
    }
}

