package io.confluent.kafka.link.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import kafka.link.ClusterLinkAutoCreateMirrorIntegrationTest;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Option;

@Tags({@Tag("integration"), @Tag("bazel:size:large")})
/* loaded from: input_file:io/confluent/kafka/link/integration/ClusterLinkPrefixAutoMirroringIntegrationTest.class */
public class ClusterLinkPrefixAutoMirroringIntegrationTest extends ClusterLinkAutoCreateMirrorIntegrationTest {
    Long syncPeriod = 100L;

    public ClusterLinkPrefixAutoMirroringIntegrationTest() {
        clusterLinkPrefix_$eq("src_");
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAutoMirroringTaskStateManagementVariousScenarios(String str, boolean z) {
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAutoMirroringNoOverlappingTopicFilters(String str, boolean z) {
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAutoMirroringAllowsLinkConfigUpdate(String str, boolean z) {
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAutoMirroringFiltersOutMirrorTopicsAndMetrics(String str, boolean z) throws InterruptedException {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), sourceCluster().listenerName(), sourceCluster().adminClientConfig());
        produceToSourceCluster(20);
        Properties destLinkPropsForAutoMirroring = destLinkPropsForAutoMirroring(includeAllTopicsFilter(), true);
        destLinkPropsForAutoMirroring.setProperty(ClusterLinkConfig.RetryTimeoutMsProp(), String.valueOf(this.syncPeriod.longValue() * 10));
        Uuid createClusterLink = createClusterLink(linkName(), destLinkPropsForAutoMirroring, sourceLinkProps(ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap())), false, true);
        waitForAutoMirrorCreation(clusterLinkPrefix() + topic());
        waitForMirror(destCluster().brokers().toSeq(), 15000L);
        verifyBasicLinkMetrics(createClusterLink, new Properties(), false);
        verifyAutoMirroringSuccessMetric();
        Thread.sleep(this.syncPeriod.longValue() * 5);
        Assertions.assertEquals(0.0d, totalKafkaMetricValue(destCluster().aliveServers(), "auto-mirror-create-failed-total", ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap()), false, linkName()));
        Assertions.assertEquals(1.0d, totalKafkaMetricValue(destCluster().aliveServers(), "auto-mirror-created-total", ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap()), false, linkName()));
        String str2 = clusterLinkPrefix() + topic();
        Assertions.assertTrue(destCluster().listMirrorTopics(false).contains(str2));
        String clusterId = ((KafkaBroker) destCluster().brokers().head()).clusterId();
        String str3 = "destRegularTopic";
        destCluster().createTopic("destRegularTopic", numPartitions(), replicationFactor(), new Properties(), destCluster().listenerName(), destCluster().adminClientConfig());
        Properties properties = new Properties();
        properties.put(ClusterLinkConfig.AutoMirroringEnableProp(), "true");
        properties.setProperty(ClusterLinkConfig.RetryTimeoutMsProp(), String.valueOf(this.syncPeriod.longValue() * 10));
        properties.put(ClusterLinkConfig.TopicFiltersProp(), includeAllTopicsFilter());
        String str4 = "prefixTwo";
        properties.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "prefixTwo");
        properties.put("bootstrap.servers", destCluster().bootstrapServers(destCluster().listenerName()));
        properties.put("metadata.max.age.ms", String.valueOf(this.syncPeriod));
        String createLinkCredentials = createLinkCredentials("sourceLink", destCluster(), Option.empty());
        properties.putAll(destCluster().clientSecurityProps("sourceLink"));
        properties.put("sasl.jaas.config", createLinkCredentials);
        sourceCluster().createClusterLinkWithAllOptions("sourceLink", properties, Option.apply(clusterId), true, Option.empty(), true);
        TestUtils.waitForCondition(() -> {
            return sourceCluster().listMirrorTopics(false).contains(str4 + str3);
        }, "Not mirrored");
        String str5 = "prefixTwodestRegularTopic";
        Assertions.assertTrue(sourceCluster().listMirrorTopics(false).contains(str5));
        Thread.sleep(this.syncPeriod.longValue() * 5);
        Assertions.assertEquals(1.0d, totalKafkaMetricValue(sourceCluster().aliveServers(), "auto-mirror-created-total", ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap()), false, "sourceLink"));
        Assertions.assertEquals(1.0d, totalKafkaMetricValue(sourceCluster().aliveServers(), "prefixed-auto-mirror-topic-filtered-count", ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap()), false, "sourceLink"));
        Assertions.assertFalse(destCluster().listMirrorTopics(false).contains(clusterLinkPrefix() + str5));
        Assertions.assertFalse(sourceCluster().listMirrorTopics(false).contains("prefixTwo" + str2));
        HashMap hashMap = new HashMap();
        hashMap.put(ClusterLinkConfig.AutoMirroringEnableProp(), "false");
        destCluster().alterClusterLink(linkName(), ClusterLinkTestUtils.convertMapToScalaMap(hashMap), destCluster().brokers().toSeq(), ClusterLinkTestUtils.convertSetToScalaSet(Collections.EMPTY_SET), true);
        destCluster().unlinkTopic(str2, linkName(), true, true, true, numPartitions());
        destCluster().deleteTopic(str2, true);
        Thread.sleep(this.syncPeriod.longValue() * 5);
        Assertions.assertEquals(0.0d, totalKafkaMetricValue(sourceCluster().aliveServers(), "prefixed-auto-mirror-topic-filtered-count", ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap()), false, "sourceLink"));
        sourceCluster().unlinkTopic(str5, "sourceLink", true, true, true, numPartitions());
        destCluster().deleteClusterLink(linkName(), true, destCluster().brokers().toSeq());
        sourceCluster().deleteClusterLink("sourceLink", true, sourceCluster().brokers().toSeq());
    }
}
