/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.link.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import kafka.link.ClusterLinkIntegrationTest;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Option;
import scala.collection.Iterable;
import scala.collection.JavaConverters;
import scala.collection.Map;
import scala.collection.Seq;

@Tag(value="integration")
class ClusterLinkPrefixIntegrationTest
extends ClusterLinkIntegrationTest {
    Long offsetToCommit = 10L;
    Long syncPeriod = 100L;
    String consumerGroup = "testGroup";

    public ClusterLinkPrefixIntegrationTest() {
        this.clusterLinkPrefix_$eq("src_");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testInvalidClusterLinkPrefixActions(String quorum) {
        String clusterLinkPrefixTwo = "src_2_";
        Properties linkProps = this.destLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap()));
        linkProps.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix());
        linkProps.setProperty(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "false");
        this.createClusterLink(this.linkName(), linkProps, this.sourceLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap())), false);
        Assertions.assertThrows(InvalidConfigurationException.class, () -> this.destCluster().linkTopic(this.topic(), this.topic(), this.replicationFactor(), this.linkName(), this.convertMapToScalaMap(Collections.emptyMap())), (String)("Mirror topic name should start with cluster link prefix " + this.clusterLinkPrefix()));
        Assertions.assertThrows(UnsupportedVersionException.class, () -> this.destCluster().linkTopic(this.clusterLinkPrefix() + this.topic().substring(1), this.topic(), this.replicationFactor(), this.linkName(), this.convertMapToScalaMap(Collections.emptyMap())), (String)"Topic renaming for mirroring not yet supported.");
        HashMap<String, String> updatedConfigs = new HashMap<String, String>();
        updatedConfigs.put(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefixTwo);
        Assertions.assertThrows(InvalidRequestException.class, () -> this.destCluster().alterClusterLink(this.linkName(), this.convertMapToScalaMap(Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefixTwo)), this.destCluster().brokers().toSeq()));
        HashMap<String, String> updatedConfigsTwo = new HashMap<String, String>();
        updatedConfigsTwo.put(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "true");
        Assertions.assertThrows(InvalidRequestException.class, () -> this.destCluster().alterClusterLink(this.linkName(), this.convertMapToScalaMap(updatedConfigsTwo), this.destCluster().brokers().toSeq()));
        String linkNameTwo = this.linkName() + "-2";
        Properties linkPropsTwo = this.destLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap()));
        linkPropsTwo.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), this.clusterLinkPrefix());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> this.createClusterLink(linkNameTwo, linkPropsTwo, this.sourceLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap())), false));
        linkPropsTwo.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), "1243.ABCD-876");
        Assertions.assertThrows(InvalidConfigurationException.class, () -> this.createClusterLink(linkNameTwo, linkPropsTwo, this.sourceLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap())), false));
        linkPropsTwo.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefixTwo);
        this.createClusterLink(linkNameTwo, linkPropsTwo, this.sourceLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap())), false);
        this.verifyLinkWithClusterLinkPrefixCountMetric(2);
        this.destCluster().deleteClusterLink(this.linkName(), false, this.destCluster().brokers());
        this.destCluster().deleteClusterLink(linkNameTwo, false, this.destCluster().brokers());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testClusterLinkPrefixAddedToConsumerOffsets(String quorum) {
        String clusterLinkPrefix = "link1_";
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), (int)this.replicationFactor(), new Properties(), this.sourceCluster().listenerName());
        Properties linkProps = this.destLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap()));
        linkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        linkProps.setProperty(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), this.consumerGroupFilter(this.consumerGroup));
        linkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        linkProps.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefix);
        linkProps.setProperty(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "true");
        Assertions.assertFalse((boolean)clusterLinkPrefix.isEmpty());
        this.createClusterLink(this.linkName(), linkProps, this.sourceLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap())), false);
        this.destCluster().linkTopic(this.topic(), (short)2, this.linkName(), this.convertMapToScalaMap(Collections.emptyMap()), clusterLinkPrefix);
        this.commitOffsets(this.sourceCluster(), this.topic(), 0, this.offsetToCommit, this.consumerGroup);
        Assertions.assertTrue((boolean)((String)JavaConverters.asJavaCollection((Iterable)this.destCluster().listTopics()).stream().findFirst().get()).startsWith(clusterLinkPrefix));
        Assertions.assertTrue((boolean)((String)JavaConverters.asJavaCollection((Iterable)this.destCluster().listConsumerGroups()).stream().findFirst().get()).startsWith(clusterLinkPrefix));
        this.destCluster().unlinkTopic(clusterLinkPrefix + this.topic(), this.linkName(), true, true, true);
        this.destCluster().deleteClusterLink(this.linkName(), false, this.destCluster().brokers());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testClusterLinkPrefixNotAddedToConsumerOffsets(String quorum) {
        String clusterLinkPrefix = "link1_";
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), (int)this.replicationFactor(), new Properties(), this.sourceCluster().listenerName());
        Properties linkProps = this.destLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap()));
        linkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        linkProps.setProperty(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), this.consumerGroupFilter(this.consumerGroup));
        linkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        linkProps.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefix);
        linkProps.setProperty(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "false");
        Assertions.assertFalse((boolean)clusterLinkPrefix.isEmpty());
        this.createClusterLink(this.linkName(), linkProps, this.sourceLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap())), false);
        this.destCluster().linkTopic(this.topic(), (short)2, this.linkName(), this.convertMapToScalaMap(Collections.emptyMap()), clusterLinkPrefix);
        this.commitOffsets(this.sourceCluster(), this.topic(), 0, this.offsetToCommit, this.consumerGroup);
        Assertions.assertTrue((boolean)((String)JavaConverters.asJavaCollection((Iterable)this.destCluster().listTopics()).stream().findFirst().get()).startsWith(clusterLinkPrefix));
        Assertions.assertFalse((boolean)((String)JavaConverters.asJavaCollection((Iterable)this.destCluster().listConsumerGroups()).stream().findFirst().get()).startsWith(clusterLinkPrefix));
        this.destCluster().unlinkTopic(clusterLinkPrefix + this.topic(), this.linkName(), true, true, true);
        this.destCluster().deleteClusterLink(this.linkName(), false, this.destCluster().brokers());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testAutoMirroringFiltersOutMirrorTopicsAndMetrics(String quorum) throws InterruptedException {
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), (int)this.replicationFactor(), new Properties(), this.sourceCluster().listenerName());
        this.produceToSourceCluster(20);
        Properties linkProps = this.destLinkPropsForAutoMirroring(this.includeAllTopicsFilter(), true);
        linkProps.setProperty(ClusterLinkConfig.RetryTimeoutMsProp(), String.valueOf(this.syncPeriod * 10L));
        UUID linkId = this.createClusterLink(this.linkName(), linkProps, this.sourceLinkProps((Map)this.convertMapToScalaMap(Collections.emptyMap())), false);
        this.waitForAutoMirrorCreation(this.clusterLinkPrefix() + this.topic());
        this.waitForMirror((Seq)this.destCluster().brokers().toSeq(), 15000L);
        this.verifyBasicLinkMetrics(linkId, new Properties(), false);
        this.verifyAutoMirroringSuccessMetric();
        Thread.sleep(this.syncPeriod * 5L);
        Assertions.assertEquals((double)0.0, (double)this.totalKafkaMetricValue((Seq)this.destCluster().aliveServers(), "auto-mirror-create-failed-total", (Map)this.convertMapToScalaMap(Collections.emptyMap()), false, this.linkName()));
        Assertions.assertEquals((double)1.0, (double)this.totalKafkaMetricValue((Seq)this.destCluster().aliveServers(), "auto-mirror-created-total", (Map)this.convertMapToScalaMap(Collections.emptyMap()), false, this.linkName()));
        String destMirrorTopic = this.clusterLinkPrefix() + this.topic();
        Assertions.assertTrue((boolean)this.destCluster().listMirrorTopics(false).contains((Object)destMirrorTopic));
        String destClusterId = ((KafkaBroker)this.destCluster().brokers().head()).clusterId();
        String destRegularTopic = "destRegularTopic";
        this.destCluster().createTopic(destRegularTopic, this.numPartitions(), (int)this.replicationFactor(), new Properties(), this.sourceCluster().listenerName());
        String sourceLinkName = "sourceLink";
        Properties sourceProps = new Properties();
        sourceProps.put(ClusterLinkConfig.AutoMirroringEnableProp(), "true");
        sourceProps.setProperty(ClusterLinkConfig.RetryTimeoutMsProp(), String.valueOf(this.syncPeriod * 10L));
        sourceProps.put(ClusterLinkConfig.TopicFiltersProp(), this.includeAllTopicsFilter());
        String sourcePrefix = "prefixTwo";
        sourceProps.put(ClusterLinkConfig.ClusterLinkPrefixProp(), sourcePrefix);
        sourceProps.put("bootstrap.servers", this.destCluster().bootstrapServers(this.destCluster().listenerName()));
        sourceProps.put("metadata.max.age.ms", String.valueOf(this.syncPeriod));
        if (quorum.equals("zk")) {
            String linkJaasConfig = this.createLinkCredentials(sourceLinkName, this.destCluster(), Option.empty());
            sourceProps.putAll((java.util.Map<?, ?>)this.destCluster().clientSecurityProps(sourceLinkName));
            sourceProps.put("sasl.jaas.config", linkJaasConfig);
        }
        this.sourceCluster().createClusterLink(sourceLinkName, sourceProps, Option.apply((Object)destClusterId), true);
        TestUtils.waitForCondition(() -> this.sourceCluster().listMirrorTopics(false).contains((Object)(sourcePrefix + destRegularTopic)), (String)"Not mirrored");
        String sourceMirrorTopic = sourcePrefix + destRegularTopic;
        Assertions.assertTrue((boolean)this.sourceCluster().listMirrorTopics(false).contains((Object)sourceMirrorTopic));
        Thread.sleep(this.syncPeriod * 5L);
        Assertions.assertEquals((double)1.0, (double)this.totalKafkaMetricValue((Seq)this.sourceCluster().aliveServers(), "auto-mirror-created-total", (Map)this.convertMapToScalaMap(Collections.emptyMap()), false, sourceLinkName));
        Assertions.assertEquals((double)1.0, (double)this.totalKafkaMetricValue((Seq)this.sourceCluster().aliveServers(), "prefixed-auto-mirror-topic-filtered-count", (Map)this.convertMapToScalaMap(Collections.emptyMap()), false, sourceLinkName));
        Assertions.assertFalse((boolean)this.destCluster().listMirrorTopics(false).contains((Object)(this.clusterLinkPrefix() + sourceMirrorTopic)));
        Assertions.assertFalse((boolean)this.sourceCluster().listMirrorTopics(false).contains((Object)(sourcePrefix + destMirrorTopic)));
        HashMap<String, String> updatedProps = new HashMap<String, String>();
        updatedProps.put(ClusterLinkConfig.AutoMirroringEnableProp(), "false");
        this.destCluster().alterClusterLink(this.linkName(), this.convertMapToScalaMap(updatedProps), this.destCluster().brokers().toSeq());
        this.destCluster().unlinkTopic(destMirrorTopic, this.linkName(), true, true, true);
        this.destCluster().deleteTopic(destMirrorTopic, true);
        Thread.sleep(this.syncPeriod * 5L);
        Assertions.assertEquals((double)0.0, (double)this.totalKafkaMetricValue((Seq)this.sourceCluster().aliveServers(), "prefixed-auto-mirror-topic-filtered-count", (Map)this.convertMapToScalaMap(Collections.emptyMap()), false, sourceLinkName));
        this.sourceCluster().unlinkTopic(sourceMirrorTopic, sourceLinkName, true, true, true);
        this.destCluster().deleteClusterLink(this.linkName(), true, this.destCluster().brokers());
        this.sourceCluster().deleteClusterLink(sourceLinkName, true, this.sourceCluster().brokers());
    }

    protected <T> scala.collection.mutable.Map<T, T> convertMapToScalaMap(java.util.Map<T, T> map) {
        return (scala.collection.mutable.Map)JavaConverters.mapAsScalaMapConverter(map).asScala();
    }
}

