/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.integration.test.SslEngineFactoryWithCorrectSni;
import io.confluent.kafka.multitenant.integration.test.SslEngineFactoryWithSniStartWithPKC;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.FileBasedPlainSaslAuthenticatorTest;
import io.confluent.kafka.server.plugins.auth.SniValidationMode;
import java.io.IOException;
import java.lang.invoke.CallSite;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.network.BrokerFqdnBuilder;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.network.ProxyProtocol;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tags(value={@Tag(value="integration"), @Tag(value="bazel:size:medium")})
public class BrokerDynamicHostnameGenerationTest {
    private final Logger log = LoggerFactory.getLogger(BrokerDynamicHostnameGenerationTest.class);
    public static final String LOCAL_HOST_IP = "127.0.0.1";
    private final String logicalClusterId = KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId();
    private final String serviceUserAPIkey = "APIKEY1";
    private final String serviceUserAPIkeyPassword = "pwd1";
    private static final String ACCESS_POINT_ID_VAL = "ap1234";
    private static final String SERVICE_ID_VAL = KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId();
    private static final String REGION_VAL = "test-region";
    private static final String ZONE_ID_VAL = "test-zone";
    private static final String TARGET_ID_VAL = "0000";
    private static final String CLOUD_VAL = "aws";
    private static final String DOMAIN_VAL = "confluent.cloud";
    private static final String BOOTSTRAP_ENDPOINT_SCHEME = "AccessPointV1";
    private static final String BROKER_ENDPOINT_SCHEME = "AccessPointTargetedZonalV1";
    private static final int DEFAULT_NUM_BROKERS = 1;
    private IntegrationTestHarness testHarness;
    protected PhysicalCluster physicalCluster;
    private String brokerUUID;
    private PhysicalClusterMetadata metadata;
    private final String testTopic = "abcd";
    private final String path = JarResourceLoader.loadFileFromResource(FileBasedPlainSaslAuthenticatorTest.class, (String)"/file_auth_test_apikeys.json").getPath();
    private Path tempDir;
    private TestInfo testInfo;
    private Map<String, Object> testCert;

    public void setUpClusterWithMultipleBrokers(Properties brokerOverrideProps, int numBrokers, List<String> encodedAdvertisedListeners) throws Exception {
        MockAuditLogProvider.reset();
        this.testHarness = new IntegrationTestHarness(this.testInfo, numBrokers, Collections.emptyList(), Collections.emptyList(), encodedAdvertisedListeners);
        Properties brokerOverrides = this.setUpMetadata(this.commonBrokerProps(brokerOverrideProps));
        this.physicalCluster = this.testHarness.start(brokerOverrides, this.controllerProps());
        int serviceUserId = 1;
        LogicalCluster logicalCluster = this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, serviceUserId);
        LogicalClusterUser testUser = logicalCluster.user(serviceUserId);
        this.testHarness.newAclCommand().addTopicAclArgs(testUser.prefixedKafkaPrincipal(), testUser.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL).execute();
    }

    public void setUpCluster(Properties brokerOverrideProps) throws Exception {
        this.setUpClusterWithMultipleBrokers(brokerOverrideProps, 1, Collections.emptyList());
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this.testInfo = testInfo;
        this.tempDir = TestUtils.tempDirectory().toPath();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
        this.metadata.close(this.brokerUUID);
    }

    private Properties setUpMetadata(Properties brokerProps) throws IOException, InterruptedException {
        this.brokerUUID = "uuid";
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put(KafkaConfig.BrokerSessionUuidProp(), this.brokerUUID);
        brokerProps.put(KafkaConfig.BrokerSessionUuidProp(), this.brokerUUID);
        configs.put("multitenant.metadata.dir", this.tempDir.toRealPath(new LinkOption[0]).toString());
        configs.put("node.id", brokerProps.get("node.id"));
        this.metadata = Utils.initiatePhysicalClusterMetadata(configs);
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_ABC, this.tempDir);
        TestUtils.waitForCondition(() -> this.metadata.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()) != null, (String)"Expected metadata of new logical cluster to be present in metadata cache");
        return brokerProps;
    }

    private Properties commonBrokerProps(Properties brokerOverrideProps) throws Exception {
        Properties props = this.nodeProps();
        props.put("listener.name.external.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        props.put("listener.name.external.confluent.security.event.logger.authentication.enable", (Object)true);
        props.put("listener.name.non_local_tenant_scoped.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        props.put("listener.name.non_local_tenant_scoped.confluent.security.event.logger.authentication.enable", (Object)true);
        props.put("listener.security.protocol.map", "INTERNAL:PLAINTEXT,REPLICATION:PLAINTEXT,EXTERNAL:SASL_SSL,NON_LOCAL_TENANT_SCOPED:SASL_SSL");
        CertStores certStores = new CertStores(true, LOCAL_HOST_IP, InetAddress.getByName(LOCAL_HOST_IP));
        this.testCert = certStores.keyStoreProps();
        this.testCert.putAll(certStores.trustStoreProps());
        props.put("listeners", "INTERNAL://127.0.0.1:0,REPLICATION://127.0.0.1:0,EXTERNAL://127.0.0.1:0,NON_LOCAL_TENANT_SCOPED://127.0.0.1:0");
        props.put("inter.broker.listener.name", "REPLICATION");
        this.testCert.values().removeIf(Objects::isNull);
        props.putAll(this.testCert);
        props.put("ce.broker.plugins.test.audit.provider.config", "TEST");
        props.put("confluent.multitenant.parse.lkc.id.enable", (Object)true);
        props.put("listener.name.external.confluent.proxy.protocol.version", ProxyProtocol.NONE.name);
        props.put("listener.name.non_local_tenant_scoped.confluent.proxy.protocol.version", ProxyProtocol.NONE.name);
        props.setProperty("listener.name.non_local_tenant_scoped.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        props.putAll((Map<?, ?>)brokerOverrideProps);
        return props;
    }

    private Properties nodeProps() {
        Properties props = new Properties();
        props.put("node.id", "0");
        props.put("sasl.enabled.mechanisms", Collections.singletonList("PLAIN"));
        props.put("authorizer.class.name", MultiTenantAuthorizer.class.getName());
        props.put("confluent.security.event.logger.multitenant.enable", "true");
        props.put("ce.broker.plugins.test.audit.provider.config", "TEST");
        props.put("confluent.close.connections.on.credential.delete", "true");
        return props;
    }

    private Properties controllerProps() {
        return this.nodeProps();
    }

    private String getBootStrapResolvableToLocalIp(String listenerName, String hostToBeReplaced) {
        return this.getBootStrapResolvableToLocalIp(listenerName, Collections.singletonList(hostToBeReplaced));
    }

    private String getBootStrapResolvableToLocalIp(String listenerName, List<String> hostsToBeReplaced) {
        String boostrapServers = this.physicalCluster.bootstrapServers(listenerName);
        for (String hostToBeReplaced : hostsToBeReplaced) {
            boostrapServers = boostrapServers.replace(hostToBeReplaced, LOCAL_HOST_IP);
        }
        return boostrapServers;
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testMultiListenersUseDynamicHostnameGeneration(String quorum) throws Exception {
        Properties brokerOverrideProps = new Properties();
        String hostTemplate = "-g001%sepusw2-az1%sep";
        brokerOverrideProps.put("advertised.listeners", String.format("INTERNAL://127.0.0.1:0,REPLICATION://127.0.0.1:0,EXTERNAL://%s:0,NON_LOCAL_TENANT_SCOPED://%s:0", hostTemplate, hostTemplate));
        brokerOverrideProps.put("confluent.multitenant.listener.hostname.cluster.prefix.enable", "true");
        brokerOverrideProps.put("confluent.multitenant.parse.sni.host.name.enable", "true");
        brokerOverrideProps.put("confluent.multitenant.listener.hostname.subdomain.suffix.enable", "true");
        brokerOverrideProps.put("confluent.subdomain.separator.variable", "%sep");
        brokerOverrideProps.put("confluent.subdomain.prefix", "us-west-2");
        brokerOverrideProps.put("confluent.subdomain.separator.map", "us-west-2.aws.glb.confluent.cloud:-,default:.");
        brokerOverrideProps.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.path + "\"refresh_ms=\"1000\"sni_host_name_validation_mode=\"" + SniValidationMode.STRICT.getText() + "\";");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.multitenant.listener.hostname.cluster.prefix.enable", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.multitenant.parse.sni.host.name.enable", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.multitenant.listener.hostname.subdomain.suffix.enable", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.subdomain.prefix", "us-west-2.aws.glb.confluent.cloud");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.subdomain.separator.map", "default:.");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.subdomain.separator.variable", "%sep");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.path + "\"refresh_ms=\"1000\"sni_host_name_validation_mode=\"" + SniValidationMode.STRICT.getText() + "\";");
        this.setUpCluster(brokerOverrideProps);
        AdminClient client = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName(), LocalHostResolver.class, new Properties(), this.getBootStrapResolvableToLocalIp("NON_LOCAL_TENANT_SCOPED", hostTemplate), null);
        DescribeClusterResult describeClusterResult = client.describeCluster(new DescribeClusterOptions());
        Collection nodes = (Collection)describeClusterResult.nodes().get();
        Assertions.assertEquals((int)1, (int)nodes.size());
        Assertions.assertEquals((Object)"lkc-abc-g001.usw2-az1.us-west-2.aws.glb.confluent.cloud", (Object)((Node)nodes.iterator().next()).host());
        client = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName(), LocalHostResolver.class, new Properties(), this.getBootStrapResolvableToLocalIp("EXTERNAL", hostTemplate), null);
        describeClusterResult = client.describeCluster(new DescribeClusterOptions());
        nodes = (Collection)describeClusterResult.nodes().get();
        Assertions.assertEquals((int)1, (int)nodes.size());
        Assertions.assertEquals((Object)"lkc-abc-g001-usw2-az1-us-west-2.aws.glb.confluent.cloud", (Object)((Node)nodes.iterator().next()).host());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testOneListenerGeneratesHostnamesDynamically_WhileOtherListenerDoesnt(String quorum) throws Exception {
        Properties brokerOverrideProps = new Properties();
        String hostTemplate = "-g001%sepusw2-az1%sep";
        brokerOverrideProps.put("advertised.listeners", String.format("INTERNAL://127.0.0.1:0,REPLICATION://127.0.0.1:0,EXTERNAL://127.0.0.1:0,NON_LOCAL_TENANT_SCOPED://%s:0", hostTemplate));
        brokerOverrideProps.put("confluent.multitenant.listener.hostname.cluster.prefix.enable", "false");
        Object aFalse = brokerOverrideProps.put("confluent.multitenant.parse.sni.host.name.enable", "false");
        brokerOverrideProps.put("confluent.multitenant.listener.hostname.subdomain.suffix.enable", "false");
        brokerOverrideProps.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.path + "\"refresh_ms=\"1000\"sni_host_name_validation_mode=\"" + SniValidationMode.OPTIONAL_VALIDATION.getText() + "\";");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.multitenant.listener.hostname.cluster.prefix.enable", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.multitenant.parse.sni.host.name.enable", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.multitenant.listener.hostname.subdomain.suffix.enable", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.subdomain.prefix", "us-west-2.aws.glb.confluent.cloud");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.subdomain.separator.map", "default:.");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.subdomain.separator.variable", "%sep");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.path + "\"refresh_ms=\"1000\"sni_host_name_validation_mode=\"" + SniValidationMode.STRICT.getText() + "\";");
        this.setUpCluster(brokerOverrideProps);
        AdminClient client = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName(), LocalHostResolver.class, new Properties(), this.getBootStrapResolvableToLocalIp("NON_LOCAL_TENANT_SCOPED", hostTemplate), null);
        DescribeClusterResult describeClusterResult = client.describeCluster(new DescribeClusterOptions());
        Collection nodes = (Collection)describeClusterResult.nodes().get();
        Assertions.assertEquals((int)1, (int)nodes.size());
        Assertions.assertEquals((Object)"lkc-abc-g001.usw2-az1.us-west-2.aws.glb.confluent.cloud", (Object)((Node)nodes.iterator().next()).host());
        client = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithSniStartWithPKC.class.getCanonicalName(), LocalHostResolver.class, new Properties(), this.getBootStrapResolvableToLocalIp("EXTERNAL", hostTemplate), null);
        describeClusterResult = client.describeCluster(new DescribeClusterOptions());
        nodes = (Collection)describeClusterResult.nodes().get();
        Assertions.assertEquals((int)1, (int)nodes.size());
        Assertions.assertEquals((Object)LOCAL_HOST_IP, (Object)((Node)nodes.iterator().next()).host());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @CsvSource(value={"kraft,false", "kraft,true"})
    public void testHostnameGenerationBasedOnPPV2EndpointScheme(String quorum, boolean mapBrokerZoneToGatewayZoneEnable) throws Exception {
        String expectedHostname;
        String brokerZone;
        Properties brokerOverrideProps = new Properties();
        brokerOverrideProps.put("confluent.multitenant.listener.hostname.cluster.prefix.enable", "false");
        brokerOverrideProps.put("confluent.multitenant.parse.sni.host.name.enable", "true");
        brokerOverrideProps.put("confluent.multitenant.listener.hostname.subdomain.suffix.enable", "false");
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.enable", "true");
        if (mapBrokerZoneToGatewayZoneEnable) {
            brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.map.broker.zone.to.gateway.zone", "true");
        }
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.templates", "AccessPointV1:$svcId-$accessPointId.$region.$cloud.accesspoint.glb.$domain,AccessPointTargetedZonalV1:$svcId-$targetId-$accessPointId.$zoneId.$region.$cloud.accesspoint.glb.$domain");
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.bootstrap.broker.template.mappings", "AccessPointV1:AccessPointTargetedZonalV1");
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.template.variables", "$svcId,$accessPointId,$region,$targetId,$zoneId,$cloud");
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.template.variable.region", REGION_VAL);
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.template.variable.cloud", CLOUD_VAL);
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.template.variable.domain", DOMAIN_VAL);
        brokerOverrideProps.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.path + "\"refresh_ms=\"1000\"sni_host_name_validation_mode=\"" + SniValidationMode.OPTIONAL_VALIDATION.getText() + "\";");
        brokerOverrideProps.put("listener.name.external.confluent.proxy.protocol.version", ProxyProtocol.V2.name);
        brokerOverrideProps.put("listener.name.external.confluent.proxy.protocol.fallback.enabled", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.path + "\"refresh_ms=\"1000\"sni_host_name_validation_mode=\"" + SniValidationMode.STRICT.getText() + "\";");
        ArrayList<String> encodedAvertisedListeners = new ArrayList<String>();
        ArrayList<String> hostsToBeReplaced = new ArrayList<String>();
        int numBrokers = 4;
        HashSet<CallSite> brokerZones = new HashSet<CallSite>();
        for (int i = 0; i < numBrokers; ++i) {
            String brokerZone2 = ZONE_ID_VAL + i;
            String hostTemplate = String.format("%s.%s", this.padNodeIdToTargetId(i), brokerZone2);
            brokerZones.add((CallSite)((Object)brokerZone2));
            hostsToBeReplaced.add(hostTemplate);
            encodedAvertisedListeners.add(String.format("INTERNAL://127.0.0.1:0,REPLICATION://127.0.0.1:0,EXTERNAL://%s:0,NON_LOCAL_TENANT_SCOPED://127.0.0.1:0", hostTemplate));
        }
        this.log.info("Broker zones: {}", brokerZones);
        Set<String> gatewayZones = null;
        Map brokerToGatewayZoneMap = null;
        if (mapBrokerZoneToGatewayZoneEnable) {
            gatewayZones = brokerZones.stream().limit(2L).collect(Collectors.toSet());
            gatewayZones.add(ZONE_ID_VAL + numBrokers);
            this.log.info("Gateway zones: {}", gatewayZones);
            brokerToGatewayZoneMap = BrokerFqdnBuilder.createBrokerToGatewayZoneMapping(gatewayZones, brokerZones);
            this.log.info("Broker to gateway zone mapping: {}", (Object)brokerToGatewayZoneMap);
        }
        this.setUpClusterWithMultipleBrokers(brokerOverrideProps, numBrokers, encodedAvertisedListeners);
        this.log.info("Cluster Setup.");
        AdminClient client = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName(), LocalHostResolver.class, new Properties(), this.getBootStrapResolvableToLocalIp("EXTERNAL", hostsToBeReplaced), null, this.proxyProtocolEngineFactory(BOOTSTRAP_ENDPOINT_SCHEME, gatewayZones));
        DescribeClusterResult describeClusterResult = client.describeCluster(new DescribeClusterOptions());
        Assertions.assertDoesNotThrow(() -> describeClusterResult.nodes().get());
        Collection nodes = (Collection)describeClusterResult.nodes().get();
        for (Node node : nodes) {
            brokerZone = ZONE_ID_VAL + node.id();
            expectedHostname = String.format("%s-%s-%s.%s.%s.%s.accesspoint.glb.%s", SERVICE_ID_VAL, this.padNodeIdToTargetId(node.id()), ACCESS_POINT_ID_VAL, mapBrokerZoneToGatewayZoneEnable ? brokerToGatewayZoneMap.get(brokerZone) : brokerZone, REGION_VAL, CLOUD_VAL, DOMAIN_VAL);
            this.log.info("Expected broker hostname: {}", (Object)expectedHostname);
            Assertions.assertEquals((Object)expectedHostname, (Object)node.host());
        }
        client = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName(), LocalHostResolver.class, new Properties(), this.getBootStrapResolvableToLocalIp("EXTERNAL", hostsToBeReplaced), null, this.proxyProtocolEngineFactory(BROKER_ENDPOINT_SCHEME, gatewayZones));
        describeClusterResult = client.describeCluster(new DescribeClusterOptions());
        Assertions.assertDoesNotThrow(() -> describeClusterResult.nodes().get());
        nodes = (Collection)describeClusterResult.nodes().get();
        for (Node node : nodes) {
            brokerZone = ZONE_ID_VAL + node.id();
            expectedHostname = String.format("%s-%s-%s.%s.%s.%s.accesspoint.glb.%s", SERVICE_ID_VAL, this.padNodeIdToTargetId(node.id()), ACCESS_POINT_ID_VAL, mapBrokerZoneToGatewayZoneEnable ? brokerToGatewayZoneMap.get(brokerZone) : brokerZone, REGION_VAL, CLOUD_VAL, DOMAIN_VAL);
            this.log.info("Expected broker hostname: {}", (Object)expectedHostname);
            Assertions.assertEquals((Object)expectedHostname, (Object)node.host());
        }
        Properties props = new Properties();
        props.put("request.timeout.ms", "5000");
        props.put("default.api.timeout.ms", "5000");
        AdminClient client2 = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName(), LocalHostResolver.class, props, this.getBootStrapResolvableToLocalIp("EXTERNAL", hostsToBeReplaced), null, this.proxyProtocolEngineFactory("InvalidTemplateId", gatewayZones));
        DescribeClusterResult describeClusterResult2 = client2.describeCluster(new DescribeClusterOptions());
        Assertions.assertThrows(ExecutionException.class, () -> describeClusterResult2.nodes().get());
        if (mapBrokerZoneToGatewayZoneEnable) {
            props = new Properties();
            props.put("request.timeout.ms", "5000");
            props.put("default.api.timeout.ms", "5000");
            client2 = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName(), LocalHostResolver.class, props, this.getBootStrapResolvableToLocalIp("EXTERNAL", hostsToBeReplaced), null, this.proxyProtocolEngineFactory("InvalidTemplateId", null));
            describeClusterResult2 = client2.describeCluster(new DescribeClusterOptions());
            Assertions.assertThrows(ExecutionException.class, () -> describeClusterResult2.nodes().get());
        }
    }

    private String padNodeIdToTargetId(int nodeId) {
        return String.format("%04d", nodeId);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testHostnameGenerationBasedOnPPV2EndpointScheme_OtherListenerUsesOldHostnameGenerationScheme(String quorum) throws Exception {
        Properties brokerOverrideProps = new Properties();
        String hostTemplateXENI = String.format("%s.%s", TARGET_ID_VAL, ZONE_ID_VAL);
        String hostTemplateEWPath = "-g001%sepusw2-az1%sep";
        brokerOverrideProps.put("advertised.listeners", String.format("INTERNAL://127.0.0.1:0,REPLICATION://127.0.0.1:0,EXTERNAL://%s:0,NON_LOCAL_TENANT_SCOPED://%s:0", hostTemplateXENI, hostTemplateEWPath));
        brokerOverrideProps.put("listener.name.external.confluent.multitenant.parse.sni.host.name.enable", "false");
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.enable", "true");
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.templates", "AccessPointV1:$svcId-$accessPointId.$region.$cloud.accesspoint.glb.$domain,AccessPointTargetedZonalV1:$svcId-$targetId-$accessPointId.$zoneId.$region.$cloud.accesspoint.glb.$domain");
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.bootstrap.broker.template.mappings", "AccessPointV1:AccessPointTargetedZonalV1");
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.template.variables", "$svcId,$accessPointId,$region,$targetId,$zoneId,$cloud");
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.template.variable.region", REGION_VAL);
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.template.variable.cloud", CLOUD_VAL);
        brokerOverrideProps.put("listener.name.external.confluent.ppv2.endpoint.scheme.template.variable.domain", DOMAIN_VAL);
        brokerOverrideProps.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.path + "\"refresh_ms=\"1000\"sni_host_name_validation_mode=\"" + SniValidationMode.OPTIONAL_VALIDATION.getText() + "\";");
        brokerOverrideProps.put("listener.name.external.confluent.proxy.protocol.version", ProxyProtocol.V2.name);
        brokerOverrideProps.put("listener.name.external.confluent.proxy.protocol.fallback.enabled", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.multitenant.listener.hostname.cluster.prefix.enable", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.multitenant.parse.sni.host.name.enable", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.multitenant.listener.hostname.subdomain.suffix.enable", "true");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.subdomain.prefix", "us-west-2.aws.glb.confluent.cloud");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.subdomain.separator.map", "default:.");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.confluent.subdomain.separator.variable", "%sep");
        brokerOverrideProps.put("listener.name.non_local_tenant_scoped.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.FileBasedLoginModule required config_path=\"" + this.path + "\"refresh_ms=\"1000\"sni_host_name_validation_mode=\"" + SniValidationMode.STRICT.getText() + "\";");
        this.setUpCluster(brokerOverrideProps);
        this.log.info("Cluster Setup.");
        String expectedHostname = String.format("%s-%s-%s.%s.%s.%s.accesspoint.glb.%s", SERVICE_ID_VAL, TARGET_ID_VAL, ACCESS_POINT_ID_VAL, ZONE_ID_VAL, REGION_VAL, CLOUD_VAL, DOMAIN_VAL);
        this.log.info("Expected broker hostname: {}", (Object)expectedHostname);
        AdminClient client = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName(), LocalHostResolver.class, new Properties(), this.getBootStrapResolvableToLocalIp("NON_LOCAL_TENANT_SCOPED", hostTemplateEWPath), null);
        DescribeClusterResult describeClusterResult = client.describeCluster(new DescribeClusterOptions());
        Collection nodes = (Collection)describeClusterResult.nodes().get();
        Assertions.assertEquals((int)1, (int)nodes.size());
        Assertions.assertEquals((Object)"lkc-abc-g001.usw2-az1.us-west-2.aws.glb.confluent.cloud", (Object)((Node)nodes.iterator().next()).host());
        client = this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), this.testCert, SslEngineFactoryWithCorrectSni.class.getCanonicalName(), LocalHostResolver.class, new Properties(), this.getBootStrapResolvableToLocalIp("EXTERNAL", hostTemplateXENI), null, this.proxyProtocolEngineFactory(BOOTSTRAP_ENDPOINT_SCHEME, new HashSet<String>(Arrays.asList(ZONE_ID_VAL))));
        describeClusterResult = client.describeCluster(new DescribeClusterOptions());
        Assertions.assertDoesNotThrow(() -> describeClusterResult.nodes().get());
        nodes = (Collection)describeClusterResult.nodes().get();
        Assertions.assertEquals((Object)expectedHostname, (Object)((Node)nodes.iterator().next()).host());
    }

    private ProxyProtocolEngineFactory proxyProtocolEngineFactory(String endpointScheme, Set<String> gatewayZones) {
        HashMap<String, Object> clientConfigs = new HashMap<String, Object>();
        clientConfigs.put("confluent.proxy.protocol.client.address", "1.1.1.1");
        clientConfigs.put("confluent.proxy.protocol.client.port", 1111);
        clientConfigs.put("confluent.proxy.protocol.client.mode", ConfluentConfigs.PROXY_PROTOCOL_CLIENT_MODE_DEFAULT);
        clientConfigs.put("confluent.ccloud.access.point.id", ACCESS_POINT_ID_VAL);
        clientConfigs.put("confluent.ccloud.fqdn.template", endpointScheme);
        if (gatewayZones != null) {
            clientConfigs.put("confluent.ccloud.gateway.zones", String.join((CharSequence)",", gatewayZones));
        }
        return new ProxyProtocolEngineFactory(ProxyProtocol.V2, clientConfigs, ConnectionMode.CLIENT, new LogContext());
    }

    public static class LocalHostResolver
    implements HostResolver {
        public InetAddress[] resolve(String host) throws UnknownHostException {
            return InetAddress.getAllByName(BrokerDynamicHostnameGenerationTest.LOCAL_HOST_IP);
        }
    }
}

