package io.confluent.kafka.server.plugins.policy;

import java.util.Collections;
import java.util.HashMap;
import kafka.server.link.ClusterLinkConfig;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/confluent/kafka/server/plugins/policy/ClusterLinkPolicyConfigTest.class */
public class ClusterLinkPolicyConfigTest {
    @Test
    public void testValidateSecurityProtocol() {
        testValidateSecurityProtocol(null, null, null);
        testValidateSecurityProtocol("localhost:9071", null, null);
        testValidateSecurityProtocol("localhost:9071", "OUTBOUND", null);
        testValidateSecurityProtocol("localhost:9071", "INBOUND", SecurityProtocol.PLAINTEXT.name);
        testValidateSecurityProtocol("localhost:9071", "OUTBOUND", SecurityProtocol.PLAINTEXT.name);
        testValidateSecurityProtocol("test.confluent.cloud:9071", "OUTBOUND", SecurityProtocol.SASL_SSL.name);
    }

    @Test
    public void testValidateJaasConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "test.confluent.cloud:9071");
        hashMap.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
        hashMap.put("security.protocol", SecurityProtocol.SASL_SSL.name);
        hashMap.put("sasl.mechanism", "PLAIN");
        hashMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxx\" password=\"\"xxxxx\"\"");
        ClusterLinkPolicyConfig clusterLinkPolicyConfig = new ClusterLinkPolicyConfig(hashMap);
        Assertions.assertEquals("sasl.jaas.config could not be loaded.", Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            clusterLinkPolicyConfig.validateSecurityConfigs(hashMap);
        }).getMessage());
    }

    @Test
    public void testValidateSecurityProtocolFailure() {
        PolicyViolationException assertThrows = Assertions.assertThrows(PolicyViolationException.class, () -> {
            testValidateSecurityProtocol("test.confluent.cloud:9071", "OUTBOUND", SecurityProtocol.PLAINTEXT.name);
        });
        Assertions.assertTrue(assertThrows.getMessage().contains("Invalid security protocol"), assertThrows.getMessage());
        PolicyViolationException assertThrows2 = Assertions.assertThrows(PolicyViolationException.class, () -> {
            testValidateSecurityProtocol("test.confluent.cloud:9071", "OUTBOUND", SecurityProtocol.SASL_PLAINTEXT.name);
        });
        Assertions.assertTrue(assertThrows2.getMessage().contains("Invalid security protocol"), assertThrows2.getMessage());
        PolicyViolationException assertThrows3 = Assertions.assertThrows(PolicyViolationException.class, () -> {
            testValidateSecurityProtocol("test.confluent.cloud:9071", "OUTBOUND", SecurityProtocol.SSL.name);
        });
        Assertions.assertTrue(assertThrows3.getMessage().contains("Invalid security protocol"), assertThrows3.getMessage());
    }

    private void testValidateSecurityProtocol(String str, String str2, String str3) {
        HashMap hashMap = new HashMap();
        if (str != null) {
            hashMap.put("bootstrap.servers", str);
        }
        if (str2 != null) {
            hashMap.put(ClusterLinkConfig.ConnectionModeProp(), str2);
        }
        if (str3 != null) {
            hashMap.put("security.protocol", str3);
        }
        new ClusterLinkPolicyConfig(new HashMap()).validateSecurityConfigs(hashMap);
    }

    @Test
    public void testMirrorTopicSyncConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:1234");
        ClusterLinkPolicyConfig clusterLinkPolicyConfig = new ClusterLinkPolicyConfig(Collections.emptyMap());
        clusterLinkPolicyConfig.validateClusterLinkConfigs(hashMap);
        hashMap.put(ClusterLinkConfig.TopicConfigSyncIncludeProp(), "flush.ms");
        Assertions.assertThrows(PolicyViolationException.class, () -> {
            clusterLinkPolicyConfig.validateClusterLinkConfigs(hashMap);
        });
        hashMap.put(ClusterLinkConfig.TopicConfigSyncIncludeProp(), "retention.ms,retention.bytes,delete.retention.ms");
        clusterLinkPolicyConfig.validateClusterLinkConfigs(hashMap);
        hashMap.put(ClusterLinkConfig.TopicConfigSyncIncludeProp(), "retention.ms,flush.ms");
        Assertions.assertThrows(PolicyViolationException.class, () -> {
            clusterLinkPolicyConfig.validateClusterLinkConfigs(hashMap);
        });
    }

    @Test
    public void testValidateIntranetBootstrap() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:1234");
        ClusterLinkPolicyConfig.validateBootstrap(hashMap);
        hashMap.put("bootstrap.servers", "pkc-xxx.intranet.confluent.cloud:1234");
        PolicyViolationException assertThrows = Assertions.assertThrows(PolicyViolationException.class, () -> {
            ClusterLinkPolicyConfig.validateBootstrap(hashMap);
        });
        Assertions.assertTrue(assertThrows.getMessage().contains("Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud"), assertThrows.getMessage());
        hashMap.put("confluent.ccloud.intranet.host.suffixes", ".intranet.fedramp.confluent");
        hashMap.put("bootstrap.servers", "pkc-xxx.intranet.fedramp.confluent:1234");
        PolicyViolationException assertThrows2 = Assertions.assertThrows(PolicyViolationException.class, () -> {
            ClusterLinkPolicyConfig.validateBootstrap(hashMap);
        });
        Assertions.assertTrue(assertThrows2.getMessage().contains("Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud"), assertThrows2.getMessage());
    }
}
