package io.confluent.kafka.link.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.log.LogConfig;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkFilterJson;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.collection.Iterable;
import scala.collection.JavaConverters;
import scala.collection.immutable.List;
import scala.collection.mutable.Map;

@Tag("integration")
@Disabled
/* loaded from: input_file:io/confluent/kafka/link/integration/ClusterLinkPrefixIntegrationTest.class */
class ClusterLinkPrefixIntegrationTest extends AbstractClusterLinkIntegrationTest {
    private Long offsetToCommit = 10L;
    private Long syncPeriod = 100L;
    private String consumerGroup = "testGroup";
    private String clusterLinkPrefix = "src_";
    private String topicFilter = String.join("\n", "{", "\"topicFilters\": [", "  {", "     \"name\": \"" + topic() + "\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"include\"", "  }", "]}");
    private String includeAllTopicsFilter = String.join("\n", "{", "\"topicFilters\": [", "  {", "     \"name\": \"*\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"include\"", "  }", "]}");

    ClusterLinkPrefixIntegrationTest() {
    }

    @Test
    public void testCreateMirrorTopicWithLinkPrefix() throws Exception {
        Properties properties = new Properties();
        properties.put(LogConfig.RetentionMsProp(), "10000");
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), properties, new Properties());
        String str = this.clusterLinkPrefix + topic();
        createClusterLink(linkName(), destLinkProps(convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix))), sourceLinkProps(null), true);
        CreateTopicsResult linkTopic = destCluster().linkTopic(topic(), replicationFactor(), linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        destCluster().describeMirrorTopic(str);
        Assertions.assertEquals(numPartitions(), (Integer) linkTopic.numPartitions(str).get());
        Assertions.assertEquals(replicationFactor(), (Integer) linkTopic.replicationFactor(str).get());
        Assertions.assertEquals("10000", ((Config) linkTopic.config(str).get()).get(LogConfig.RetentionMsProp()).value());
        Assertions.assertEquals(Collections.singleton(str), (Set) ((Collection) ((ClusterLinkListing) ((Iterable) destCluster().listClusterLinks(true).filter(clusterLinkListing -> {
            return Boolean.valueOf(clusterLinkListing.linkName().equals(linkName()));
        })).head()).topics().get()).stream().collect(Collectors.toSet()));
    }

    @Test
    public void testStopMirrorTopicWithLinkPrefixAndInvalidRequest() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        produceToSourceCluster(100);
        createClusterLink(linkName(), destLinkProps(convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix))), sourceLinkProps(null), true);
        Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> {
            destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), false, false);
        });
        destCluster().createTopic(this.clusterLinkPrefix + topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), false, false);
        });
        destCluster().deleteTopic(this.clusterLinkPrefix + topic());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), true, true);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, this.clusterLinkPrefix + topic());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), false, false);
        });
        destCluster().deleteTopic(this.clusterLinkPrefix + topic());
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
    }

    @Test
    public void testStopMirrorWithLinkPrefix() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        produceToSourceCluster(100);
        createClusterLink(linkName(), destLinkProps(convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix))), sourceLinkProps(null), false);
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), true, false);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, this.clusterLinkPrefix + topic());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, this.clusterLinkPrefix);
        destCluster().deleteTopic(this.clusterLinkPrefix + topic());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), true, true);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, this.clusterLinkPrefix + topic());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, this.clusterLinkPrefix);
        destCluster().deleteTopic(this.clusterLinkPrefix + topic());
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
    }

    @Test
    public void testTopicConfigSyncWithLinkPrefix() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        produceToSourceCluster(20);
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps(convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix))), sourceLinkProps(null), false);
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        sourceCluster().alterTopic(topic(), convertMapToScalaMap(Collections.singletonMap("delete.retention.ms", "80000000")));
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(destCluster().describeTopicConfig(this.clusterLinkPrefix + topic()).get("delete.retention.ms").value().equals("80000000"));
        }, () -> {
            return "Topic configs did not get propagated";
        }, 15000L, 100L);
        waitForMirror(destCluster().servers(), 15000L, this.clusterLinkPrefix);
        verifyBasicLinkMetrics(createClusterLink, new Properties(), false, this.clusterLinkPrefix);
        verifyTopicConfigChangeMetrics();
        verifyMirror(topic(), destCluster().servers(), true, this.clusterLinkPrefix);
    }

    @Test
    public void testAddPartitionsWithLinkPrefix() {
        numPartitions_$eq(1);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        HashMap hashMap = new HashMap();
        hashMap.put("metadata.max.age.ms", "1000");
        hashMap.put(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix);
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps(convertMapToScalaMap(hashMap)), sourceLinkProps(null), false);
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        produceToSourceCluster(4);
        waitForMirror(destCluster().servers(), 15000L, this.clusterLinkPrefix);
        numPartitions_$eq(4);
        sourceCluster().createPartitions(topic(), numPartitions());
        produceToSourceCluster(8);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(destCluster().describeTopic(new StringBuilder().append(this.clusterLinkPrefix).append(topic()).toString()).partitions().size() == numPartitions());
        }, () -> {
            return "Partitions were not mirrored";
        }, 15000L, 100L);
        Assertions.assertEquals(numPartitions(), destCluster().describeTopic(this.clusterLinkPrefix + topic()).partitions().size());
        produceToSourceCluster(8);
        waitForMirror(destCluster().servers(), 15000L, this.clusterLinkPrefix);
        verifyBasicLinkMetrics(createClusterLink, new Properties(), false, this.clusterLinkPrefix);
        verifyAddPartitionMetrics();
        verifyMirror(topic(), destCluster().servers(), true, this.clusterLinkPrefix);
    }

    @Test
    public void testAutoMirroringWithLinkPrefix() throws InterruptedException {
        autoMirrorTopicWithLinkPrefix(this.syncPeriod);
        String str = topic() + "-2";
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.TopicFiltersProp(), this.includeAllTopicsFilter)), destCluster().servers().toSeq());
        sourceCluster().createTopic(str, 1, 1, new Properties(), new Properties());
        waitForAutoMirrorCreation(this.clusterLinkPrefix + str);
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.TopicFiltersProp(), String.join("\n", "{", "\"topicFilters\": [", "  {", "     \"name\": \"*\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"include\"", "  },", "  {", "     \"name\": \"" + str + "\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"exclude\"", "  }", "]}"))), destCluster().servers().toSeq());
        destCluster().deleteTopic(this.clusterLinkPrefix + str);
        Thread.sleep(1000L);
        Assertions.assertFalse(destCluster().listMirrorTopics(false).contains(this.clusterLinkPrefix + str));
        String str2 = topic() + "-conflict";
        destCluster().createTopic(this.clusterLinkPrefix + str2, 1, 1, new Properties(), new Properties());
        sourceCluster().createTopic(str2, 1, 1, new Properties(), new Properties());
        destCluster().deleteTopic(this.clusterLinkPrefix + str2, false);
        waitForAutoMirrorCreation(this.clusterLinkPrefix + str2);
        destCluster().unlinkTopic(this.clusterLinkPrefix + str2, linkName(), false, true);
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), true, true);
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
    }

    private void autoMirrorTopicWithLinkPrefix(Long l) {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        produceToSourceCluster(20);
        Properties destLinkPropsForAutoMirroringWithLinkPrefix = destLinkPropsForAutoMirroringWithLinkPrefix(this.topicFilter, true);
        destLinkPropsForAutoMirroringWithLinkPrefix.setProperty(ClusterLinkConfig.RetryTimeoutMsProp(), Long.valueOf(l.longValue() * 10).toString());
        UUID createClusterLink = createClusterLink(linkName(), destLinkPropsForAutoMirroringWithLinkPrefix, sourceLinkProps(null), false);
        waitForAutoMirrorCreation(this.clusterLinkPrefix + topic());
        waitForMirror(destCluster().servers(), 15000L, this.clusterLinkPrefix);
        verifyBasicLinkMetrics(createClusterLink, new Properties(), false, this.clusterLinkPrefix);
        verifyAutoMirroringSuccessMetric();
    }

    private Properties destLinkPropsForAutoMirroringWithLinkPrefix(String str, boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.AutoMirroringEnableProp(), "true");
        hashMap.put(ClusterLinkConfig.TopicFiltersProp(), str);
        hashMap.put("metadata.max.age.ms", this.syncPeriod.toString());
        if (z) {
            hashMap.put(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix);
        }
        return destLinkProps(convertMapToScalaMap(hashMap));
    }

    private <T> Map<T, T> convertMapToScalaMap(java.util.Map<T, T> map) {
        return (Map) JavaConverters.mapAsScalaMapConverter(map).asScala();
    }

    @Test
    public void testAutoMirroringNonOverlappingTopicFiltersWithLinkPrefix() {
        Properties destLinkPropsForAutoMirroringWithLinkPrefix = destLinkPropsForAutoMirroringWithLinkPrefix(this.topicFilter, true);
        createClusterLink(linkName(), destLinkPropsForAutoMirroringWithLinkPrefix, sourceLinkProps(null), false);
        String str = linkName() + "-2";
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            createClusterLink(str, destLinkPropsForAutoMirroringWithLinkPrefix, sourceLinkProps(null), false);
        });
        Properties destLinkPropsForAutoMirroringWithLinkPrefix2 = destLinkPropsForAutoMirroringWithLinkPrefix(this.includeAllTopicsFilter, true);
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            createClusterLink(str, destLinkPropsForAutoMirroringWithLinkPrefix2, sourceLinkProps(null), false);
        });
        destLinkPropsForAutoMirroringWithLinkPrefix2.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_2_");
        UUID createClusterLink = createClusterLink(str, destLinkPropsForAutoMirroringWithLinkPrefix2, sourceLinkProps(null), false);
        destCluster().alterClusterLink(str, convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.TopicFiltersProp(), this.topicFilter)), destCluster().servers().toSeq());
        Assertions.assertEquals(ClusterLinkFilterJson.parse(this.topicFilter), ((ClusterLinkFactory.ConnectionManager) ((KafkaServer) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
        if (useSourceInitiatedLink()) {
            sourceCluster().deleteClusterLink(linkName(), false, sourceCluster().servers());
            Assertions.assertTrue(sourceCluster().listClusterLinks(false).size() == 1);
        }
        destCluster().alterClusterLink(str, convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.TopicFiltersProp(), this.topicFilter)), destCluster().servers().toSeq());
        Assertions.assertEquals(ClusterLinkFilterJson.parse(this.topicFilter), ((ClusterLinkFactory.ConnectionManager) ((KafkaServer) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().alterClusterLink(str, convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.TopicFiltersProp(), this.topicFilter)), destCluster().servers().toSeq());
        Assertions.assertEquals(ClusterLinkFilterJson.parse(this.topicFilter), ((ClusterLinkFactory.ConnectionManager) ((KafkaServer) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        createClusterLink(linkName(), destLinkPropsForAutoMirroringWithLinkPrefix(this.includeAllTopicsFilter, false), sourceLinkProps(null), false);
        destCluster().alterClusterLink(str, convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.TopicFiltersProp(), this.includeAllTopicsFilter)), destCluster().servers().toSeq());
        Assertions.assertEquals(ClusterLinkFilterJson.parse(this.includeAllTopicsFilter), ((ClusterLinkFactory.ConnectionManager) ((KafkaServer) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
        destCluster().deleteClusterLink(str, false, destCluster().servers());
    }

    @Test
    public void testAutoMirroringAllowsLinkConfigUpdateWithLinkPrefix() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        produceToSourceCluster(20);
        createClusterLink(linkName(), destLinkPropsForAutoMirroringWithLinkPrefix(this.topicFilter, true), sourceLinkProps(null), false);
        waitForAutoMirrorCreation(this.clusterLinkPrefix + topic());
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ClusterLinkPausedProp(), "true")), destCluster().servers().toSeq());
        sourceCluster().createTopic("linkedTopicTwo", numPartitions(), replicationFactor(), new Properties(), new Properties());
        produceToSourceCluster(20);
        String join = String.join("\n", "{", "\"topicFilters\": [", "  {", "     \"name\": \"linkedTopicTwo\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"include\"", "  }", "]}");
        String str = linkName() + "-2";
        Properties destLinkPropsForAutoMirroringWithLinkPrefix = destLinkPropsForAutoMirroringWithLinkPrefix(join, false);
        destLinkPropsForAutoMirroringWithLinkPrefix.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_2_");
        createClusterLink(str, destLinkPropsForAutoMirroringWithLinkPrefix, sourceLinkProps(null), false);
        waitForAutoMirrorCreation(this.clusterLinkPrefix + topic());
        waitForAutoMirrorCreation("src_2_linkedTopicTwo");
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ClusterLinkPausedProp(), "false")), destCluster().servers().toSeq());
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), true, false);
        destCluster().unlinkTopic("src_2_linkedTopicTwo", str, true, false);
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
        destCluster().deleteClusterLink(str, false, destCluster().servers());
    }

    @Test
    public void testAutoMirroringUpdateExistingLinkWithLinkPrefix() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix);
        hashMap.put("metadata.max.age.ms", this.syncPeriod.toString());
        createClusterLink(linkName(), destLinkProps(convertMapToScalaMap(hashMap)), sourceLinkProps(null), false);
        destCluster().createTopic(this.clusterLinkPrefix + topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(ClusterLinkConfig.AutoMirroringEnableProp(), "true");
        hashMap2.put(ClusterLinkConfig.TopicFiltersProp(), this.topicFilter);
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            destCluster().alterClusterLink(linkName(), convertMapToScalaMap(hashMap2), destCluster().servers().toSeq());
        });
        destCluster().deleteTopic(this.clusterLinkPrefix + topic());
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(hashMap2), destCluster().servers().toSeq());
        waitForAutoMirrorCreation(this.clusterLinkPrefix + topic());
        produceToSourceCluster(10);
        waitForMirror(destCluster().servers(), 15000L, this.clusterLinkPrefix);
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), true, false);
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
    }

    @Test
    public void testAutoMirroringAddingAdditionalTopicWithLinkPrefix() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        createClusterLink(linkName(), destLinkPropsForAutoMirroringWithLinkPrefix(this.topicFilter, true), sourceLinkProps(null), false);
        waitForAutoMirrorCreation(this.clusterLinkPrefix + topic());
        String str = this.clusterLinkPrefix + "linkedTopic2";
        sourceCluster().createTopic(str, numPartitions(), replicationFactor(), new Properties(), new Properties());
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.TopicFiltersProp(), String.join("\n", "{", "\"topicFilters\": [", "  {", "     \"name\": \"" + this.clusterLinkPrefix + topic() + "\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"include\"", "  },", "  {", "     \"name\": \"" + str + "\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"include\"", "  }", "]}"))), destCluster().servers().toSeq());
        waitForAutoMirrorCreation(this.clusterLinkPrefix + str);
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), false, false);
        destCluster().unlinkTopic(this.clusterLinkPrefix + str, linkName(), true, false);
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
    }

    @Test
    public void testAutoMirroringNoExistingTopicWithLinkPrefix() {
        destCluster().createTopic(this.clusterLinkPrefix + topic(), 1, 1, new Properties(), new Properties());
        Properties destLinkPropsForAutoMirroringWithLinkPrefix = destLinkPropsForAutoMirroringWithLinkPrefix(this.topicFilter, true);
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            createClusterLink(linkName(), destLinkPropsForAutoMirroringWithLinkPrefix, sourceLinkProps(null), false);
        });
        destLinkPropsForAutoMirroringWithLinkPrefix.setProperty(ClusterLinkConfig.TopicFiltersProp(), this.includeAllTopicsFilter);
        UUID createClusterLink = createClusterLink(linkName(), destLinkPropsForAutoMirroringWithLinkPrefix, sourceLinkProps(null), false);
        java.util.Map singletonMap = Collections.singletonMap(ClusterLinkConfig.TopicFiltersProp(), this.topicFilter);
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            destCluster().alterClusterLink(linkName(), convertMapToScalaMap(singletonMap), destCluster().servers().toSeq());
        });
        Assertions.assertEquals(ClusterLinkFilterJson.parse(this.includeAllTopicsFilter), ((ClusterLinkFactory.ConnectionManager) ((KafkaServer) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().deleteTopic(this.clusterLinkPrefix + topic());
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(singletonMap), destCluster().servers().toSeq());
        Assertions.assertEquals(ClusterLinkFilterJson.parse(this.topicFilter), ((ClusterLinkFactory.ConnectionManager) ((KafkaServer) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(singletonMap), destCluster().servers().toSeq());
        Assertions.assertEquals(ClusterLinkFilterJson.parse(this.topicFilter), ((ClusterLinkFactory.ConnectionManager) ((KafkaServer) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
    }

    @Test
    public void testDeleteAutoMirroredTopicsWithLinkPrefix() {
        autoMirrorTopicWithLinkPrefix(this.syncPeriod);
        Assertions.assertThrows(TopicDeletionDisabledException.class, () -> {
            destCluster().deleteTopic(this.clusterLinkPrefix + topic(), false);
        });
        Assertions.assertTrue(destCluster().listMirrorTopics(false).contains(this.clusterLinkPrefix + topic()));
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), true, false);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, this.clusterLinkPrefix + topic());
        destCluster().deleteTopic(this.clusterLinkPrefix + topic(), false);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(!destCluster().listMirrorTopics(false).contains(new StringBuilder().append(this.clusterLinkPrefix).append(topic()).toString()));
        }, () -> {
            return "Mirror not stopped";
        }, 15000L, 100L);
        waitForAutoMirrorCreation(this.clusterLinkPrefix + topic());
        waitForMirror(destCluster().servers(), 15000L, this.clusterLinkPrefix);
        sourceCluster().deleteTopic(topic());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.FAILED, this.clusterLinkPrefix + topic());
        destCluster().deleteTopic(this.clusterLinkPrefix + topic());
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        produceToSourceCluster(20);
        waitForAutoMirrorCreation(this.clusterLinkPrefix + topic());
        String join = String.join("\n", "{", "\"topicFilters\": [", "  {", "     \"name\": \"*\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"include\"", "  },", "  {", "     \"name\": \"" + topic() + "\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"exclude\"", "  }", "]}");
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.AutoMirroringEnableProp(), "true");
        hashMap.put(ClusterLinkConfig.TopicFiltersProp(), join);
        hashMap.put("metadata.max.age.ms", this.syncPeriod.toString());
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(hashMap), destCluster().servers().toSeq());
        destCluster().deleteTopic(this.clusterLinkPrefix + topic());
        Assertions.assertFalse(destCluster().listMirrorTopics(false).contains(this.clusterLinkPrefix + topic()));
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
    }

    @Test
    public void testOffsetMigrationWithAddedConsumerGroup() {
        String join = String.join("\n", "{", "\"groupFilters\": [", "  {", "     \"name\": \"" + this.consumerGroup + "\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"include\"", "  },", "  {", "     \"name\": \"testGroup2\",", "     \"patternType\": \"literal\",", "     \"filterType\": \"include\"", "  }", "]}");
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        hashMap.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(this.consumerGroup));
        hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        hashMap.put(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix);
        Properties destLinkProps = destLinkProps(convertMapToScalaMap(hashMap));
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, sourceLinkProps(null), false);
        destCluster().linkTopic(topic(), (short) 2, linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        commitOffsets(sourceCluster(), topic(), 0, this.offsetToCommit.longValue(), this.consumerGroup);
        verifyOffsetMigration(this.clusterLinkPrefix + topic(), 0, this.offsetToCommit.longValue(), this.clusterLinkPrefix + this.consumerGroup);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        hashMap2.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), join);
        hashMap2.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        destCluster().alterClusterLink(linkName(), convertMapToScalaMap(hashMap2), destCluster().servers().toSeq());
        commitOffsets(sourceCluster(), topic(), 0, 20L, this.consumerGroup);
        commitOffsets(sourceCluster(), topic(), 0, 20L, "testGroup2");
        verifyOffsetMigration(this.clusterLinkPrefix + topic(), 0, 20L, this.clusterLinkPrefix + this.consumerGroup);
        verifyOffsetMigration(this.clusterLinkPrefix + topic(), 0, 20L, this.clusterLinkPrefix + "testGroup2");
        produceToSourceCluster(10);
        waitForMirror(destCluster().servers(), 15000L, this.clusterLinkPrefix);
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, false, this.clusterLinkPrefix);
        verifyConsumerOffsetMigrationMetrics();
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), true, true);
        destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
    }

    @Test
    public void testOffsetMigrationWithAddedTopic() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        sourceCluster().createTopic("linkedTopic2", numPartitions(), replicationFactor(), new Properties(), new Properties());
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        hashMap.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(this.consumerGroup));
        hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        hashMap.put(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix);
        Properties destLinkProps = destLinkProps(convertMapToScalaMap(hashMap));
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, sourceLinkProps(null), false);
        destCluster().linkTopic(topic(), (short) 2, linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        commitOffsets(sourceCluster(), topic(), 0, this.offsetToCommit.longValue(), this.consumerGroup);
        verifyOffsetMigration(this.clusterLinkPrefix + topic(), 0, this.offsetToCommit.longValue(), this.clusterLinkPrefix + this.consumerGroup);
        destCluster().linkTopic("linkedTopic2", (short) 2, linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        commitOffsets(sourceCluster(), topic(), 0, 20L, this.consumerGroup);
        commitOffsets(sourceCluster(), "linkedTopic2", 0, 20L, this.consumerGroup);
        verifyOffsetMigration(this.clusterLinkPrefix + topic(), 0, 20L, this.clusterLinkPrefix + this.consumerGroup);
        verifyOffsetMigration(this.clusterLinkPrefix + "linkedTopic2", 0, 20L, this.clusterLinkPrefix + this.consumerGroup);
        produceToSourceCluster(10);
        waitForMirror(destCluster().servers(), 15000L, this.clusterLinkPrefix);
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, false, this.clusterLinkPrefix);
        verifyConsumerOffsetMigrationMetrics();
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), false, true);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, this.clusterLinkPrefix + topic());
        alterClusterLink(linkName(), convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(this.consumerGroup).replaceAll("include", "exclude"))));
        Properties properties = new Properties();
        properties.setProperty("group.id", this.consumerGroup);
        KafkaConsumer createConsumer = destCluster().createConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), properties, scalaList(new ArrayList()));
        createConsumer.subscribe(Collections.singleton("linkedTopic2"));
        do {
            createConsumer.poll(Duration.ofMillis(10L));
        } while (createConsumer.assignment().isEmpty());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.submit(() -> {
            return createConsumer.poll(Duration.ofMillis(10L));
        });
        try {
            destCluster().unlinkTopic(this.clusterLinkPrefix + "linkedTopic2", linkName(), true, true);
            waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, this.clusterLinkPrefix + "linkedTopic2");
            newSingleThreadExecutor.shutdownNow();
            destCluster().deleteClusterLink(linkName(), false, destCluster().servers());
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    public static <T> List<T> scalaList(java.util.List<T> list) {
        return ((Iterable) JavaConverters.collectionAsScalaIterableConverter(list).asScala()).toList();
    }

    private void testLastFetchedOffsetStoppedMirrorTopicDescription(boolean z) {
        numPartitions_$eq(1);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), new Properties());
        createClusterLink(linkName(), destLinkProps(convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix))), sourceLinkProps(null), true);
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), convertMapToScalaMap(Collections.emptyMap()), this.clusterLinkPrefix);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(destCluster().leaderEpoch(new TopicPartition(new StringBuilder().append(this.clusterLinkPrefix).append(topic()).toString(), 0)) >= 1);
        }, () -> {
            return "Destination leader epoch not updated";
        }, 15000L, 100L);
        produceToSourceCluster(10);
        waitForMirror(destCluster().servers(), 15000L, this.clusterLinkPrefix);
        ReplicaStatus replicaStatus = (ReplicaStatus) ((Iterable) destCluster().replicaStatus(this.clusterLinkPrefix + topic(), 0, false).filter(replicaStatus2 -> {
            return Boolean.valueOf(replicaStatus2.isLeader() && !replicaStatus2.linkName().isPresent());
        })).head();
        Assertions.assertTrue(replicaStatus.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo = (ReplicaStatus.MirrorInfo) replicaStatus.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, mirrorInfo.state());
        Assertions.assertEquals(10, mirrorInfo.lastFetchSourceHighWatermark());
        destCluster().unlinkTopic(this.clusterLinkPrefix + topic(), linkName(), z, true);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, this.clusterLinkPrefix + topic());
        ReplicaStatus replicaStatus3 = (ReplicaStatus) ((Iterable) destCluster().replicaStatus(this.clusterLinkPrefix + topic(), 0, false).filter(replicaStatus4 -> {
            return Boolean.valueOf(replicaStatus4.isLeader() && !replicaStatus4.linkName().isPresent());
        })).head();
        Assertions.assertTrue(replicaStatus3.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo2 = (ReplicaStatus.MirrorInfo) replicaStatus3.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.STOPPED, mirrorInfo2.state());
        Assertions.assertEquals(-1L, mirrorInfo2.lastFetchSourceHighWatermark());
        MirrorTopicDescription describeMirrorTopic = destCluster().describeMirrorTopic(this.clusterLinkPrefix + topic());
        Assertions.assertEquals(describeMirrorTopic.state(), MirrorTopicDescription.State.STOPPED);
        Assertions.assertEquals(1, describeMirrorTopic.stoppedLogEndOffsets().size());
        Assertions.assertEquals(10, (Long) describeMirrorTopic.stoppedLogEndOffsets().get(0));
    }

    @Test
    public void testLastFetchedOffsetPromotedMirrorTopicDescription() {
        testLastFetchedOffsetStoppedMirrorTopicDescription(true);
    }

    @Test
    public void testLastFetchedOffsetFailedOverMirrorTopicDescription() {
        testLastFetchedOffsetStoppedMirrorTopicDescription(false);
    }
}
