package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.admin.AclCommand;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.ZkAdminManager;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateClusterLinksOptions;
import org.apache.kafka.clients.admin.ListClusterLinksOptions;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.NewTopicMirror;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import scala.Option;
import scala.collection.JavaConverters;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantClusterLinkTest.class */
public class MultiTenantClusterLinkTest {
    private final MultiTenantCluster sourceCluster = new MultiTenantCluster();
    private final MultiTenantCluster destCluster = new MultiTenantCluster();
    private final String linkName = "tenantLink";
    private final String topic = "linkedTopic";
    private int numPartitions = 2;
    private int nextMessageIndex = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantClusterLinkTest$MultiTenantCluster.class */
    public static class MultiTenantCluster extends IntegrationTestHarness {
        private PhysicalCluster physicalCluster;
        private LogicalCluster logicalCluster;
        private LogicalClusterUser user;
        private LogicalClusterUser linkUser;
        private ConfluentAdmin admin;
        private boolean useSourceConnectionOrigination;
        KafkaProducer<String, String> producer;
        KafkaConsumer<String, String> consumer;

        private MultiTenantCluster() {
        }

        void startCluster(Properties properties, String str, int i) throws Exception {
            this.physicalCluster = start(properties, Optional.of(Time.SYSTEM), (v0) -> {
                v0.addBrokerAcls();
            });
            this.logicalCluster = this.physicalCluster.createLogicalCluster(str, 100, Integer.valueOf(i));
            this.user = this.logicalCluster.user(i);
            this.admin = super.createAdminClient(this.logicalCluster.adminUser());
            addAclsForAdminUser();
        }

        LogicalClusterUser createLinkUser(int i) throws Exception {
            LogicalClusterUser addUser = this.logicalCluster.addUser(this.physicalCluster.getOrCreateUser(i, false));
            addLinkAcls(addUser);
            if (this.useSourceConnectionOrigination) {
                addReverseConnectionAcls(addUser);
            }
            return addUser;
        }

        void deleteUser(LogicalClusterUser logicalClusterUser, boolean z) {
            this.logicalCluster.removeUser(logicalClusterUser.userMetadata.userId());
            if (z) {
                deleteAcls(logicalClusterUser);
            }
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
            }
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
        }

        void createDestClusterLink(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i) throws Throwable {
            HashMap hashMap = new HashMap();
            if (this.useSourceConnectionOrigination) {
                hashMap.put(ClusterLinkConfig.LinkModeProp(), "DESTINATION");
                hashMap.put(ClusterLinkConfig.ConnectionModeProp(), "INBOUND");
            } else {
                multiTenantCluster.linkUser = multiTenantCluster.createLinkUser(i);
                Properties securityProps = KafkaTestUtils.securityProps(multiTenantCluster.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), multiTenantCluster.linkUser.saslJaasConfig());
                securityProps.stringPropertyNames().forEach(str2 -> {
                });
            }
            hashMap.put("request.timeout.ms", "10000");
            hashMap.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            hashMap.put(ClusterLinkConfig.AclFiltersProp(), "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\" }}]}");
            hashMap.put(ClusterLinkConfig.AclSyncEnableProp(), "true");
            hashMap.put(ClusterLinkConfig.AclSyncMsProp(), "2000");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), "{ \"groupFilters\": [{ \"name\": \"*\", \"patternType\": \"literal\", \"filterType\": \"include\" }]}");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
            hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "2000");
            NewClusterLink newClusterLink = new NewClusterLink(str, multiTenantCluster.logicalCluster.logicalClusterId(), hashMap);
            confluentAdmin.createClusterLinks(Collections.singleton(newClusterLink), new CreateClusterLinksOptions().validateOnly(false).validateLink(true)).all().get();
            setInternalClusterLinkConfigs(str, Collections.singletonMap("metadata.max.age.ms", "1000"));
        }

        void createSourceClusterLink(ConfluentAdmin confluentAdmin, String str, MultiTenantCluster multiTenantCluster, int i) throws Throwable {
            Assert.assertTrue(this.useSourceConnectionOrigination);
            multiTenantCluster.linkUser = multiTenantCluster.createLinkUser(1002);
            this.linkUser = createLinkUser(i);
            Properties securityProps = KafkaTestUtils.securityProps(multiTenantCluster.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), multiTenantCluster.linkUser.saslJaasConfig());
            Properties securityProps2 = KafkaTestUtils.securityProps(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig());
            HashMap hashMap = new HashMap();
            hashMap.put(ClusterLinkConfig.LinkModeProp(), "SOURCE");
            hashMap.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
            hashMap.put(ClusterLinkConfig.LocalListenerNameProp(), "EXTERNAL");
            securityProps.stringPropertyNames().forEach(str2 -> {
            });
            securityProps2.stringPropertyNames().forEach(str3 -> {
            });
            hashMap.put("request.timeout.ms", "10000");
            hashMap.put("metadata.max.age.ms", "10000");
            hashMap.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            NewClusterLink newClusterLink = new NewClusterLink(str, multiTenantCluster.logicalCluster.logicalClusterId(), hashMap);
            confluentAdmin.createClusterLinks(Collections.singleton(newClusterLink), new CreateClusterLinksOptions().validateOnly(false).validateLink(true)).all().get();
        }

        void setInternalClusterLinkConfigs(String str, Map<String, String> map) throws Exception {
            String str2 = this.user.tenantPrefix() + str;
            List<ClusterLinkFactory.ClientManager> waitForClientManagers = waitForClientManagers(str2);
            List<Admin> waitForAdmins = waitForAdmins(waitForClientManagers, Collections.emptyList());
            ZkAdminManager adminManager = this.physicalCluster.kafkaCluster().brokers().get(0).adminManager();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, str2);
            ArrayList arrayList = new ArrayList(map.size());
            for (Map.Entry<String, String> entry : map.entrySet()) {
                arrayList.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
            }
            Assert.assertEquals(ApiError.NONE, JavaConverters.mapAsJavaMap(adminManager.incrementalAlterConfigs(JavaConverters.mapAsScalaMap(Collections.singletonMap(configResource, JavaConverters.asScalaBuffer(arrayList).toSeq())), false, this.user.unprefixedKafkaPrincipal())).get(configResource));
            for (ClusterLinkFactory.ClientManager clientManager : waitForClientManagers) {
                for (Map.Entry<String, String> entry2 : map.entrySet()) {
                    MultiTenantClusterLinkTest.waitFor(() -> {
                        return linkConfig(clientManager, str2, (String) entry2.getKey());
                    }, entry2.getValue(), "Link config not propagated: " + entry2.getKey());
                }
            }
            waitForAdmins(waitForClientManagers, waitForAdmins);
        }

        void alterClusterLink(String str, String str2, String str3) throws Exception {
            this.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, str), Collections.singleton(new AlterConfigOp(new ConfigEntry(str2, str3), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
            MultiTenantClusterLinkTest.waitFor(() -> {
                return linkConfig(str, str2);
            }, str3, "Link config not updated");
        }

        private List<ClusterLinkFactory.ClientManager> waitForClientManagers(String str) throws Exception {
            TestUtils.waitForCondition(() -> {
                Iterator<KafkaServer> it = this.physicalCluster.kafkaCluster().brokers().iterator();
                while (it.hasNext()) {
                    if (!clusterLinkClientManager(it.next(), str).isPresent()) {
                        return false;
                    }
                }
                return true;
            }, "Cluster link client managers not created");
            return (List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> {
                return clusterLinkClientManager(kafkaServer, str).get();
            }).collect(Collectors.toList());
        }

        private Optional<ClusterLinkFactory.ClientManager> clusterLinkClientManager(KafkaServer kafkaServer, String str) {
            ClusterLinkFactory.LinkManager clusterLinkManager = kafkaServer.clusterLinkManager();
            Option find = clusterLinkManager.listClusterLinks().find(clusterLinkData -> {
                return Boolean.valueOf(clusterLinkData.linkName().equals(str));
            });
            return find.isEmpty() ? Optional.empty() : Optional.ofNullable(clusterLinkManager.clientManager(((ClusterLinkData) find.get()).linkId()).getOrElse(() -> {
                return null;
            }));
        }

        private String linkConfig(ClusterLinkFactory.ClientManager clientManager, String str, String str2) {
            return (String) clientManager.currentConfig().originalsStrings().get(str2);
        }

        private List<Admin> waitForAdmins(List<ClusterLinkFactory.ClientManager> list, List<Admin> list2) throws Exception {
            ArrayList arrayList = new ArrayList();
            TestUtils.waitForCondition(() -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ClusterLinkClientManager clusterLinkClientManager = (ClusterLinkFactory.ClientManager) it.next();
                    arrayList.clear();
                    ConfluentAdmin admin = clusterLinkClientManager.getAdmin();
                    if (admin == null || list2.stream().anyMatch(admin2 -> {
                        return admin2 == admin;
                    })) {
                        return false;
                    }
                    arrayList.add(admin);
                }
                return true;
            }, "Admin clients not created");
            return arrayList;
        }

        KafkaProducer<String, String> getOrCreateProducer() {
            if (this.producer == null) {
                this.producer = createProducer(this.user, SecurityProtocol.SASL_PLAINTEXT);
            }
            return this.producer;
        }

        KafkaConsumer<String, String> getOrCreateConsumer(String str) {
            if (this.consumer == null) {
                this.consumer = createConsumer(this.user, str, SecurityProtocol.SASL_PLAINTEXT);
            }
            return this.consumer;
        }

        Set<AclBinding> describeAcls(LogicalClusterUser logicalClusterUser) {
            try {
                return new HashSet((Collection) this.admin.describeAcls(aclBindingFilter(logicalClusterUser)).values().get(15L, TimeUnit.SECONDS));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        void deleteAcls(LogicalClusterUser logicalClusterUser) {
            try {
                this.admin.deleteAcls(Collections.singleton(aclBindingFilter(logicalClusterUser))).all().get(15L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private AclBindingFilter aclBindingFilter(LogicalClusterUser logicalClusterUser) {
            return new AclBindingFilter(ResourcePatternFilter.ANY, new AccessControlEntryFilter(logicalClusterUser.unprefixedKafkaPrincipal().toString(), (String) null, AclOperation.ANY, AclPermissionType.ANY));
        }

        private void addAclsForAdminUser() throws Exception {
            AclCommand.main(SecurityTestUtils.clusterAclArgs(this.physicalCluster.kafkaCluster().zkConnect(), this.user.prefixedKafkaPrincipal(), "All"));
        }

        private void addLinkAcls(LogicalClusterUser logicalClusterUser) throws Exception {
            String kafkaPrincipal = logicalClusterUser.unprefixedKafkaPrincipal().toString();
            this.admin.createAcls(Utils.mkSet(new AclBinding[]{new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))})).all().get(15L, TimeUnit.SECONDS);
        }

        private void addReverseConnectionAcls(LogicalClusterUser logicalClusterUser) throws Exception {
            this.admin.createAcls(Utils.mkSet(new AclBinding[]{new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(logicalClusterUser.unprefixedKafkaPrincipal().toString(), "*", AclOperation.ALTER, AclPermissionType.ALLOW))})).all().get(15L, TimeUnit.SECONDS);
        }

        int partitionsForTopic(String str) {
            try {
                return ((TopicDescription) ((KafkaFuture) this.admin.describeTopics(Collections.singleton(str)).values().get(str)).get(15L, TimeUnit.SECONDS)).partitions().size();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        String topicConfig(String str, String str2) {
            try {
                ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
                return (String) ((Config) ((KafkaFuture) this.admin.describeConfigs(Collections.singleton(configResource)).values().get(configResource)).get(15L, TimeUnit.SECONDS)).entries().stream().filter(configEntry -> {
                    return configEntry.name().equals(str2);
                }).findFirst().map((v0) -> {
                    return v0.value();
                }).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        String linkConfig(String str, String str2) {
            return (String) ((ClusterLinkFactory.ConnectionManager) this.physicalCluster.kafkaCluster().brokers().get(0).clusterLinkManager().connectionManager(linkId(str)).get()).currentConfig().originalsStrings().get(str2);
        }

        UUID linkId(String str) {
            return ((ClusterLinkData) this.physicalCluster.kafkaCluster().brokers().get(0).clusterLinkManager().listClusterLinks().find(clusterLinkData -> {
                return Boolean.valueOf(clusterLinkData.linkName().equals(this.user.tenantPrefix() + str));
            }).get()).linkId();
        }

        Map<TopicPartition, OffsetAndMetadata> committedOffsets(String str) {
            try {
                return (Map) this.admin.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        this.sourceCluster.startCluster(brokerProps(), "sourceLogicalCluster", 1);
        this.destCluster.startCluster(brokerProps(), "destLogicalCluster", 11);
        addAcls(this.destCluster.admin, this.destCluster.user);
        addAcls(this.sourceCluster.admin, this.sourceCluster.user);
    }

    @After
    public void tearDown() throws Exception {
        this.sourceCluster.shutdown();
        this.destCluster.shutdown();
    }

    @Test
    public void testMultiTenantClusterLink() throws Throwable {
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001);
        UUID linkId = this.destCluster.linkId("tenantLink");
        createMirroredTopic();
        verifyTopicListing();
        verifyTopicMirroring();
        addSourcePartitionsAndVerifyMirror(4);
        changeSourceTopicConfigAndVerifyMirror();
        verifyAclAndOffsetMigration();
        verifyTopicMirroring();
        verifyTenantMetrics(linkId, 1001);
        verifyMetricsGroups(linkId);
        stopMirroring("linkedTopic");
    }

    @Test
    public void testClusterLinkSecurityUpdate() throws Throwable {
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001);
        createMirroredTopic();
        verifyTopicListing();
        verifyTopicMirroring();
        verifyAclAndOffsetMigration();
        LogicalClusterUser createLinkUser = this.sourceCluster.createLinkUser(2001);
        this.destCluster.alterClusterLink("tenantLink", "sasl.jaas.config", createLinkUser.saslJaasConfig());
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = createLinkUser;
        verifyTopicMirroring();
        verifyAclAndOffsetMigration();
        verifySocketBufferSizeUpdate();
        addSourcePartitionsAndVerifyMirror(4);
        changeSourceTopicConfigAndVerifyMirror();
        verifyTopicMirroring();
    }

    @Test
    public void testSourceConnectionOrigination() throws Throwable {
        this.destCluster.useSourceConnectionOrigination = true;
        this.sourceCluster.useSourceConnectionOrigination = true;
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1);
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003);
        createMirroredTopic();
        verifyTopicListing();
        verifyTopicMirroring();
        verifyMetricsGroups(this.destCluster.linkId("tenantLink"));
        verifyTenantMetrics(this.destCluster.linkId("tenantLink"), 1003);
        verifyAclAndOffsetMigration();
        LogicalClusterUser createLinkUser = this.sourceCluster.createLinkUser(2001);
        this.sourceCluster.alterClusterLink("tenantLink", ClusterLinkConfig.LocalPrefix() + "sasl.jaas.config", createLinkUser.saslJaasConfig());
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = createLinkUser;
        verifyTopicMirroring();
        verifyAclAndOffsetMigration();
        verifySocketBufferSizeUpdate();
        addSourcePartitionsAndVerifyMirror(4);
        changeSourceTopicConfigAndVerifyMirror();
        verifyTopicMirroring();
    }

    private Properties brokerProps() {
        Properties properties = new Properties();
        properties.put("confluent.cluster.link.enable", "true");
        properties.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put("confluent.plugins.topic.policy.replication.factor", "1");
        properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        properties.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        properties.put(KafkaConfig.PasswordEncoderSecretProp(), "multi-tenant-cluster-link-secret");
        properties.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        return properties;
    }

    private void createMirroredTopic() throws Exception {
        this.sourceCluster.physicalCluster.kafkaCluster().createTopic(this.sourceCluster.user.tenantPrefix() + "linkedTopic", this.numPartitions, 1);
        this.destCluster.admin.createTopics(Collections.singleton(new NewTopic("linkedTopic", Optional.empty(), Optional.of((short) 1)).mirror(Optional.of(new NewTopicMirror("tenantLink", "linkedTopic"))))).all().get();
    }

    private void stopMirroring(String str) throws Throwable {
        this.destCluster.admin.alterMirrors(Collections.singletonMap(str, AlterMirrorOp.FAILOVER), new AlterMirrorsOptions()).all().get(15L, TimeUnit.SECONDS);
    }

    private void verifyTopicListing() throws Exception {
        Collection collection = (Collection) this.destCluster.admin.listClusterLinks(new ListClusterLinksOptions().includeTopics(true)).result().get();
        Assert.assertEquals(1L, collection.size());
        ClusterLinkListing clusterLinkListing = (ClusterLinkListing) collection.iterator().next();
        Assert.assertEquals("tenantLink", clusterLinkListing.linkName());
        Assert.assertTrue(clusterLinkListing.topics().isPresent());
        Collection collection2 = (Collection) clusterLinkListing.topics().get();
        Assert.assertEquals(1L, collection2.size());
        Assert.assertEquals("linkedTopic", collection2.iterator().next());
    }

    private void verifyTopicMirroring() throws Throwable {
        int i = this.nextMessageIndex;
        this.nextMessageIndex += 10;
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(), "linkedTopic", i, 10);
        KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), "linkedTopic", i, 10);
    }

    private void verifyAclAndOffsetMigration() throws Throwable {
        Set<AclBinding> addAcls = addAcls(this.sourceCluster.admin, this.sourceCluster.user);
        addBrokerAclsForOffsetMigration();
        String str = "linkedGroup";
        Map<TopicPartition, OffsetAndMetadata> commitOffsets = commitOffsets(this.sourceCluster.admin, "linkedGroup");
        waitFor(() -> {
            return this.destCluster.describeAcls(this.sourceCluster.user);
        }, addAcls, "Acls not migrated");
        waitFor(() -> {
            return this.destCluster.committedOffsets(str);
        }, commitOffsets, "Consumer offsets not migrated");
    }

    private Map<String, String> linkMetricTags(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, str);
        hashMap.put("link-name", "tenantLink");
        hashMap.put("request", str2);
        return hashMap;
    }

    private Map<String, String> linkIdMetricTags(UUID uuid, String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, str);
        hashMap.put("link-id", uuid.toString());
        hashMap.put("request", str2);
        return hashMap;
    }

    private void verifyTenantMetrics(UUID uuid, int i) {
        HashMap hashMap = new HashMap();
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, "sourceLogicalCluster");
        hashMap.put(MultiTenantRequestContextTest.USERNAME, String.valueOf(i));
        hashMap.put("request", "Fetch");
        Assert.assertNotEquals(0.0d, metricValue(this.sourceCluster, "tenant-metrics", "request-total", hashMap), 0.001d);
        Assert.assertNotEquals(0.0d, metricValue(this.sourceCluster, "tenant-metrics", "request-byte-total", hashMap), 0.001d);
        Assert.assertNotEquals(0.0d, metricValue(this.sourceCluster, "tenant-metrics", "response-byte-total", hashMap), 0.001d);
        Map<String, String> linkMetricTags = linkMetricTags("destLogicalCluster", "Fetch");
        assertRange("requests", metricValue(this.sourceCluster, "tenant-metrics", "request-total", hashMap), metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "request-total", linkMetricTags), 10.0d);
        assertRange("request-bytes", metricValue(this.sourceCluster, "tenant-metrics", "request-byte-total", hashMap), metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "request-byte-total", linkMetricTags), 2000.0d);
        assertRange("response-bytes", metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "response-byte-total", linkMetricTags), metricValue(this.sourceCluster, "tenant-metrics", "response-byte-total", hashMap), 2000.0d);
        double millis = TimeUnit.NANOSECONDS.toMillis((long) metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "response-time-ns-max", linkMetricTags));
        Assert.assertTrue("Invalid response time metric: " + millis, millis > 0.0d && millis < 15000.0d);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(MultiTenantRequestContextTest.TENANT_NAME, "destLogicalCluster");
        hashMap2.put("link-name", "tenantLink");
        Assert.assertEquals(1.0d, metricValue(this.destCluster, "cluster-link-metrics", "link-count", hashMap2), 0.001d);
        Assert.assertEquals(1.0d, metricValue(this.destCluster, "cluster-link-metrics", "global-active-mirror-topic-count", hashMap2), 0.001d);
        Assert.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-metrics", "global-failed-mirror-topic-count", hashMap2), 0.001d);
        Assert.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-metrics", "global-stopped-mirror-topic-count", hashMap2), 0.001d);
        Assert.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-metrics", "global-paused-mirror-topic-count", hashMap2), 0.001d);
        Assert.assertEquals(this.numPartitions, metricValue(this.destCluster, "cluster-link-metrics", "mirror-partition-count", hashMap2), 0.001d);
        Assert.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-metrics", "failed-mirror-partition-count", hashMap2), 0.001d);
        Map<String, String> linkIdMetricTags = linkIdMetricTags(uuid, "sourceLogicalCluster", "Fetch");
        assertRange("requests", metricValue(this.sourceCluster, "cluster-link-source-metrics", "request-total", linkIdMetricTags), metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "request-total", linkMetricTags), 10.0d);
        assertRange("request-bytes", metricValue(this.sourceCluster, "cluster-link-source-metrics", "request-byte-total", linkIdMetricTags), metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "request-byte-total", linkMetricTags), 2000.0d);
        assertRange("response-bytes", metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "response-byte-total", linkMetricTags), metricValue(this.sourceCluster, "cluster-link-source-metrics", "response-byte-total", linkIdMetricTags), 2000.0d);
        double millis2 = TimeUnit.NANOSECONDS.toMillis((long) metricValue(this.sourceCluster, "cluster-link-source-metrics", "response-time-ns-max", linkIdMetricTags));
        Assert.assertTrue("Invalid source link response time metric: " + millis2, millis2 > 0.0d && millis2 < 15000.0d);
    }

    private void verifyMetricsGroups(UUID uuid) {
        Map<String, String> linkMetricTags = linkMetricTags("destLogicalCluster", "Metadata");
        Map<String, String> linkMetricTags2 = linkMetricTags("sourceLogicalCluster", "Metadata");
        Assert.assertEquals(0.0d, metricValue(this.sourceCluster, "cluster-link-dest-tenant-metrics", "request-total", linkMetricTags2, false), 0.001d);
        Assert.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-source-tenant-metrics", "request-total", linkMetricTags, false), 0.001d);
        double metricValue = metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "request-total", linkMetricTags);
        Assert.assertTrue("Dest metric not updated: " + metricValue, metricValue > 0.0d);
        if (this.sourceCluster.useSourceConnectionOrigination) {
            double metricValue2 = metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", "request-total", linkMetricTags2);
            Assert.assertTrue("Source metric not updated: " + metricValue2, metricValue2 > 0.0d);
        } else {
            Assert.assertEquals(0.0d, metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", "request-total", linkMetricTags2, false), 0.001d);
        }
        Map<String, String> linkIdMetricTags = linkIdMetricTags(uuid, "destLogicalCluster", "Metadata");
        double metricValue3 = metricValue(this.sourceCluster, "cluster-link-source-metrics", "request-total", linkIdMetricTags(uuid, "sourceLogicalCluster", "Metadata"));
        Assert.assertTrue("Source metric not updated: " + metricValue3, metricValue3 > 0.0d);
        Assert.assertEquals(0.0d, metricValue(this.destCluster, "cluster-link-source-metrics", "request-total", linkIdMetricTags, false), 0.001d);
        HashMap hashMap = new HashMap();
        hashMap.put("link-id", uuid.toString());
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, "sourceLogicalCluster");
        hashMap.put("mode", "source");
        Assert.assertEquals(this.sourceCluster.physicalCluster.kafkaCluster().brokers().size(), (int) totalMetricValue(this.sourceCluster, "cluster-link-metrics", "active-link-count", hashMap));
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, "destLogicalCluster");
        hashMap.put("mode", "destination");
        Assert.assertEquals(this.destCluster.physicalCluster.kafkaCluster().brokers().size(), (int) totalMetricValue(this.destCluster, "cluster-link-metrics", "active-link-count", hashMap));
    }

    private void verifySocketBufferSizeUpdate() throws Exception {
        long j = 2097152;
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        Assert.assertEquals(PolicyViolationException.class, ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        Assert.assertEquals(PolicyViolationException.class, ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(131072L)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return this.destCluster.linkConfig("tenantLink", KafkaConfig.ReplicaSocketReceiveBufferBytesProp());
        }, String.valueOf(131072L), "Link config not updated");
    }

    private void assertRange(String str, double d, double d2, double d3) {
        Assert.assertTrue(String.format("Metric values for '%s' (%f, %f) not within expected range %f", str, Double.valueOf(d), Double.valueOf(d2), Double.valueOf(d3)), d2 - d <= d3 && d - d2 < 0.001d);
    }

    private double metricValue(MultiTenantCluster multiTenantCluster, String str, String str2, Map<String, String> map) {
        return metricValue(multiTenantCluster, str, str2, map, true);
    }

    private double metricValue(MultiTenantCluster multiTenantCluster, String str, String str2, Map<String, String> map, boolean z) {
        double d = 0.0d;
        boolean z2 = false;
        Iterator<KafkaServer> it = multiTenantCluster.physicalCluster.kafkaCluster().brokers().iterator();
        while (it.hasNext()) {
            for (Map.Entry entry : it.next().metrics().metrics().entrySet()) {
                if (isMatchingMetric((MetricName) entry.getKey(), str2, str, map)) {
                    z2 = true;
                    d += ((Double) ((KafkaMetric) entry.getValue()).metricValue()).doubleValue();
                }
            }
        }
        if (z) {
            Assert.assertTrue("Metric not found " + str2, z2);
        }
        return d;
    }

    private double totalMetricValue(MultiTenantCluster multiTenantCluster, String str, String str2, Map<String, String> map) {
        double d = 0.0d;
        Iterator<KafkaServer> it = multiTenantCluster.physicalCluster.kafkaCluster().brokers().iterator();
        while (it.hasNext()) {
            for (Map.Entry entry : it.next().metrics().metrics().entrySet()) {
                if (isMatchingMetric((MetricName) entry.getKey(), str2, str, map)) {
                    d += ((Double) ((KafkaMetric) entry.getValue()).metricValue()).doubleValue();
                }
            }
        }
        return d;
    }

    private boolean isMatchingMetric(MetricName metricName, String str, String str2, Map<String, String> map) {
        if (!metricName.name().equals(str) || !metricName.group().equals(str2)) {
            return false;
        }
        Map tags = metricName.tags();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (!entry.getValue().equals(tags.get(entry.getKey()))) {
                return false;
            }
        }
        return true;
    }

    private void addSourcePartitionsAndVerifyMirror(int i) throws Exception {
        ((KafkaFuture) this.sourceCluster.admin.createPartitions(Collections.singletonMap("linkedTopic", NewPartitions.increaseTo(i))).values().get("linkedTopic")).get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return Integer.valueOf(this.destCluster.partitionsForTopic("linkedTopic"));
        }, Integer.valueOf(i), "Topic partitions not updated");
        this.numPartitions = i;
    }

    private void changeSourceTopicConfigAndVerifyMirror() throws Exception {
        this.sourceCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "linkedTopic"), Collections.singleton(new AlterConfigOp(new ConfigEntry("max.message.bytes", "123456"), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        waitFor(() -> {
            return this.destCluster.topicConfig("linkedTopic", "max.message.bytes");
        }, "123456", "Topic configs not migrated");
    }

    private Set<AclBinding> addAcls(Admin admin, LogicalClusterUser logicalClusterUser) throws Exception {
        String kafkaPrincipal = logicalClusterUser.unprefixedKafkaPrincipal().toString();
        Set<AclBinding> mkSet = Utils.mkSet(new AclBinding[]{new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.ALL, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))});
        admin.createAcls(mkSet).all().get(15L, TimeUnit.SECONDS);
        return mkSet;
    }

    private void addBrokerAclsForOffsetMigration() throws Exception {
        String zkConnect = this.destCluster.physicalCluster.kafkaCluster().zkConnect();
        String str = this.destCluster.user.tenantPrefix() + "linked";
        AclCommand.main(SecurityTestUtils.consumeAclArgs(zkConnect, PhysicalCluster.BROKER_PRINCIPAL, str, str, PatternType.PREFIXED));
    }

    private Map<TopicPartition, OffsetAndMetadata> commitOffsets(Admin admin, String str) throws Exception {
        HashMap hashMap = new HashMap();
        LogManager logManager = this.sourceCluster.physicalCluster.kafkaCluster().brokers().get(0).logManager();
        for (int i = 0; i < this.numPartitions; i++) {
            hashMap.put(new TopicPartition("linkedTopic", i), new OffsetAndMetadata(((AbstractLog) logManager.getLog(new TopicPartition(this.sourceCluster.user.tenantPrefix() + "linkedTopic", i), false).get()).localLogEndOffset()));
        }
        admin.alterConsumerGroupOffsets(str, hashMap).all().get();
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void waitFor(Supplier<T> supplier, T t, String str) throws Exception {
        TestUtils.waitForCondition(() -> {
            return t.equals(supplier.get());
        }, () -> {
            return str + " : expected=" + t + ", actual=" + supplier.get();
        });
    }
}
