package io.confluent.kafka.multitenant.integration.test;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.TrafficNetworkIdValidationMode;
import io.confluent.kafka.server.plugins.auth.oauth.MockBasicAuthStore;
import io.confluent.kafka.server.plugins.auth.oauth.MockTrustCache;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import io.confluent.kafka.traffic.TrafficNetworkIdAllowedRoutes;
import io.confluent.security.auth.provider.oauth.EnhancedOAuthBearerValidatorCallbackHandler;
import io.confluent.security.auth.store.data.JwtIssuerKey;
import io.confluent.security.auth.store.data.JwtIssuerValue;
import java.io.IOException;
import java.io.StringWriter;
import java.security.interfaces.RSAPublicKey;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.network.CCloudTrafficType;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.ProxyProtocol;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.network.RequestCallback;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.server.traffic.TrafficNetworkIdRoutesStore;
import org.apache.kafka.test.TestUtils;
import org.jose4j.jwk.JsonWebKey;
import org.jose4j.jwk.JsonWebKeySet;
import org.jose4j.jwk.RsaJsonWebKey;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tags({@Tag("integration"), @Tag("bazel:shard_count:2")})
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/TopicBasedNetworkIdAuthIntegrationTest.class */
public class TopicBasedNetworkIdAuthIntegrationTest extends AbstractTopicBasedPlainSaslAuthIntegrationTest {
    public static final String TEST_WITH_PARAMETERIZED_NAMES = "{displayName}.quorum={0},validationMode={1},routesEnabled={2}";
    private static Map<String, Object> pp2ValidateTrafficHeader = Collections.singletonMap("confluent.ccloud.traffic.type", CCloudTrafficType.PL_PUBLIC_IP_NLB);
    private static Map<String, Object> pp2NoValidateTrafficHeader = Collections.singletonMap("confluent.ccloud.traffic.type", CCloudTrafficType.PL_PRIVATE_LINK_NLB);
    private static final String CONFLUENT_JWT_ISSUER = "confluent";
    private final String orgResourceId = Utils.LC_META_ABC.organizationId();
    private final Properties adminProperties = new Properties();
    private static final String BROKER_NETWORK_ID = "NE-Broker";
    private static final String CLIENT_NETWORK_ID = "NE-Client";
    private final String networkIdTopic = "_confluent-network_id_routes";
    private ObjectMapper objectMapper;
    private boolean useProxyProtocol;
    private TrafficNetworkIdValidationMode validationMode;
    private boolean routesEnabled;
    private boolean createNetworkIdTopic;
    private String allowedNetworkId;
    private SaslMechanism saslMechanism;
    private OAuthUtils.JwsContainer jwsContainer;
    private MockBasicAuthStore authStore;
    private String brokerSessionUUID;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/TopicBasedNetworkIdAuthIntegrationTest$SaslAuthenticateRequestCallback.class */
    public static class SaslAuthenticateRequestCallback implements RequestCallback {
        AtomicInteger callCount = new AtomicInteger();
        String networkId;

        public SaslAuthenticateRequestCallback(String str) {
            this.networkId = str;
        }

        public void onRequest(KafkaPrincipal kafkaPrincipal, AbstractRequest abstractRequest, Optional<String> optional, Optional<String> optional2) {
            if (abstractRequest instanceof SaslAuthenticateRequest) {
                if (this.networkId != null) {
                    ((SaslAuthenticateRequest) abstractRequest).data().setNetworkId(this.networkId);
                }
                this.callCount.incrementAndGet();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/TopicBasedNetworkIdAuthIntegrationTest$SaslMechanism.class */
    public enum SaslMechanism {
        PLAIN,
        OAUTHBEARER
    }

    public TopicBasedNetworkIdAuthIntegrationTest() {
        this.adminProperties.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
        this.networkIdTopic = "_confluent-network_id_routes";
        this.createNetworkIdTopic = true;
    }

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.numBrokers = 1;
        this.brokerSessionUUID = UUID.randomUUID().toString();
        super.setUp(testInfo);
        this.useProxyProtocol = false;
        this.saslMechanism = SaslMechanism.PLAIN;
        this.allowedNetworkId = CLIENT_NETWORK_ID;
        this.objectMapper = new ObjectMapper();
        this.validationMode = TrafficNetworkIdValidationMode.STRICT;
        this.routesEnabled = true;
        this.jwsContainer = new OAuthUtils.Builder(100000, CONFLUENT_JWT_ISSUER, "1", this.orgResourceId).jku("https://localhost/keys").withKid(true).build();
        this.authStore = MockBasicAuthStore.create(this.brokerSessionUUID);
        MockTrustCache trustCache = this.authStore.trustCache();
        JsonWebKey rsaJsonWebKey = new RsaJsonWebKey((RSAPublicKey) this.jwsContainer.verificationKey());
        rsaJsonWebKey.setKeyId(this.jwsContainer.getKid());
        trustCache.put(new JwtIssuerKey(CONFLUENT_JWT_ISSUER, (String) null, ""), new JwtIssuerValue(new JsonWebKeySet(new JsonWebKey[]{rsaJsonWebKey})));
    }

    private List<String> getAllowedNetworkIds() {
        return Arrays.asList("foo", this.allowedNetworkId, "bar");
    }

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        this.authStore.close();
        MockAuditLogProvider.reset();
    }

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    protected List<String> getInitialTopics() {
        return this.createNetworkIdTopic ? Arrays.asList("_confluent-apikey", "_confluent-logical_clusters", "_confluent-network_id_routes") : Arrays.asList("_confluent-apikey", "_confluent-logical_clusters");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    public void startWithTopic() throws Exception {
        super.startWithTopic(Optional.of(Time.SYSTEM));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    public Properties brokerProps(long j, boolean z) throws IOException, InterruptedException {
        String str;
        Properties brokerProps = super.brokerProps(j, z);
        brokerProps.put("broker.session.uuid", this.brokerSessionUUID);
        if (this.useProxyProtocol) {
            str = "";
            brokerProps.put("listener.name.external.confluent.proxy.protocol.version", ProxyProtocol.V2.name());
            brokerProps.put("listener.name.external.confluent.proxy.protocol.fallback.enabled", "true");
        } else {
            str = "traffic_network_id_validation_mode=\"" + this.validationMode.name() + "\"";
        }
        if (this.saslMechanism == SaslMechanism.OAUTHBEARER) {
            brokerProps.putAll(IntegrationTestHarness.defaultOAuthBrokerProps());
            try {
                brokerProps.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + str + " publicKeyPath=\"" + this.jwsContainer.getPublicKeyFile().toPath() + "\"authenticator.jwt.config.url=\"" + JarResourceLoader.loadFileFromResourceWithClassLoader(getClass(), "AuthConfigEnhanced.yaml").toPath() + "\";");
                brokerProps.put("listener.name.external.oauthbearer.sasl.server.callback.handler.class", EnhancedOAuthBearerValidatorCallbackHandler.class.getName());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            brokerProps.put("listener.name.external.plain.sasl.jaas.config", String.format("io.confluent.kafka.server.plugins.auth.TopicBasedLoginModule required %s;", str));
        }
        brokerProps.put("confluent.traffic.cdc.network.id.routes.enable", Boolean.valueOf(this.routesEnabled));
        brokerProps.put("confluent.traffic.cdc.network.id.routes.topic.name", "_confluent-network_id_routes");
        brokerProps.put("confluent.traffic.network.id", BROKER_NETWORK_ID);
        brokerProps.put("confluent.traffic.cdc.network.id.routes.periodic.start.task.ms", 100);
        return brokerProps;
    }

    private String encodeValue(List<String> list) throws IOException {
        TrafficNetworkIdAllowedRoutes trafficNetworkIdAllowedRoutes = new TrafficNetworkIdAllowedRoutes(list, (List) null);
        StringWriter stringWriter = new StringWriter();
        this.objectMapper.writeValue(stringWriter, trafficNetworkIdAllowedRoutes);
        return stringWriter.toString();
    }

    private void loadAllowedNetworkIds(List<String> list) throws IOException {
        List<String> brokerSessionUuids = this.physicalCluster.brokerSessionUuids();
        EmbeddedKafkaCluster kafkaCluster = this.physicalCluster.kafkaCluster();
        long j = this.baseSequenceId;
        this.baseSequenceId = j + 1;
        kafkaCluster.produceData("_confluent-network_id_routes", j, getAllowedNetworkIdsKey(), encodeValue(list));
        list.forEach(str -> {
            try {
                TestUtils.waitForCondition(() -> {
                    return brokerSessionUuids.stream().allMatch(str -> {
                        return TrafficNetworkIdRoutesStore.getRoutes(str) != null && TrafficNetworkIdRoutesStore.getRoutes(str).allows(str);
                    });
                }, "Expected metadata to get consumed");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private String getAllowedNetworkIdsKey() {
        return "NE-Broker:pkc-foo";
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    public void testValidNetworkIdWithMissingRoutesTopic(String str) throws Exception {
        this.validationMode = TrafficNetworkIdValidationMode.STRICT;
        this.routesEnabled = true;
        this.createNetworkIdTopic = false;
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        assertAuthFailure(this.allowedNetworkId);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    public void testValidNetworkIdWithMissingRoutesTopicOAuth(String str) throws Exception {
        this.saslMechanism = SaslMechanism.OAUTHBEARER;
        testValidNetworkIdWithMissingRoutesTopic(str);
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    @CsvSource({"zk,strict,true", "zk,strict,false", "zk,none,true", "zk,none,false", "kraft,strict,true", "kraft,strict,false", "kraft,none,true", "kraft,none,false"})
    public void testValidNetworkId(String str, String str2, boolean z) throws Exception {
        this.validationMode = TrafficNetworkIdValidationMode.fromString(str2);
        this.routesEnabled = z;
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        if (z) {
            loadAllowedNetworkIds(getAllowedNetworkIds());
        }
        if (this.validationMode != TrafficNetworkIdValidationMode.STRICT || z) {
            assertAuthSuccess(this.allowedNetworkId);
        } else {
            assertAuthFailure(this.allowedNetworkId);
        }
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    @CsvSource({"zk,strict,true", "zk,strict,false", "zk,none,true", "zk,none,false", "kraft,strict,true", "kraft,strict,false", "kraft,none,true", "kraft,none,false"})
    public void testValidNetworkIdPP2(String str, String str2, boolean z) throws Exception {
        this.useProxyProtocol = true;
        testValidNetworkId(str, str2, z);
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    @CsvSource({"zk,strict,true", "zk,strict,false", "zk,none,true", "zk,none,false"})
    public void testValidNetworkIdOAuth(String str, String str2, boolean z) throws Exception {
        this.saslMechanism = SaslMechanism.OAUTHBEARER;
        testValidNetworkId(str, str2, z);
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    @CsvSource({"kraft,strict,true", "kraft,strict,false", "kraft,none,true", "kraft,none,false"})
    public void testValidNetworkIdOAuthPP2(String str, String str2, boolean z) throws Exception {
        this.saslMechanism = SaslMechanism.OAUTHBEARER;
        this.useProxyProtocol = true;
        testValidNetworkId(str, str2, z);
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    @CsvSource({"zk,strict,true", "zk,strict,false", "zk,none,true", "zk,none,false", "kraft,strict,true", "kraft,strict,false", "kraft,none,true", "kraft,none,false"})
    public void testInvalidNetworkIdPP2(String str, String str2, boolean z) throws Exception {
        this.useProxyProtocol = true;
        testInvalidNetworkId(str, str2, z);
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    @CsvSource({"zk,strict,true", "zk,strict,false", "zk,none,true", "zk,none,false", "kraft,strict,true", "kraft,strict,false", "kraft,none,true", "kraft,none,false"})
    public void testInvalidNetworkId(String str, String str2, boolean z) throws Exception {
        this.validationMode = TrafficNetworkIdValidationMode.fromString(str2);
        this.routesEnabled = z;
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        if (z) {
            loadAllowedNetworkIds(getAllowedNetworkIds());
        }
        if (this.validationMode == TrafficNetworkIdValidationMode.NONE) {
            assertAuthSuccess("InvalidNetworkIdFoo");
        } else {
            assertAuthFailure("InvalidNetworkIdFoo");
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRouteChanges(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        loadAllowedNetworkIds(getAllowedNetworkIds());
        assertAuthSuccess(this.allowedNetworkId);
        resetAuditEvents();
        this.sampleTopics = Collections.singletonList(new NewTopic("abcd1", 3, (short) 1));
        loadAllowedNetworkIds(Arrays.asList("nr1", "nr2"));
        assertAuthFailure(this.allowedNetworkId);
        resetAuditEvents();
        this.sampleTopics = Collections.singletonList(new NewTopic("abcd2", 3, (short) 1));
        loadAllowedNetworkIds(getAllowedNetworkIds());
        assertAuthSuccess(this.allowedNetworkId);
        resetAuditEvents();
        this.sampleTopics = Collections.singletonList(new NewTopic("abcd3", 3, (short) 1));
        loadAllowedNetworkIds(Collections.emptyList());
        assertAuthFailure(this.allowedNetworkId);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRouteChangesShouldForceDisconnectClientOAuth(String str) throws Exception {
        this.saslMechanism = SaslMechanism.OAUTHBEARER;
        testRouteChangesShouldForceDisconnectClient(str);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRouteChangesShouldForceDisconnectClient(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        loadAllowedNetworkIds(getAllowedNetworkIds());
        assertForceDisconnectOnDisallowNetwork(Collections.singletonList("nr1"));
        this.allowedNetworkId = "NE-Client-new";
        loadAllowedNetworkIds(getAllowedNetworkIds());
        this.sampleTopics = Collections.singletonList(new NewTopic("abcd1", 3, (short) 1));
        assertForceDisconnectOnDisallowNetwork(Collections.emptyList());
    }

    private void assertForceDisconnectOnDisallowNetwork(List<String> list) throws InterruptedException, ExecutionException, IOException {
        SaslAuthenticateRequestCallback saslAuthenticateRequestCallback = new SaslAuthenticateRequestCallback(this.allowedNetworkId);
        AdminClient createAuthAdminClient = createAuthAdminClient(saslAuthenticateRequestCallback);
        Throwable th = null;
        try {
            try {
                createAuthAdminClient.createTopics(this.sampleTopics).all().get();
                List list2 = (List) this.sampleTopics.stream().map((v0) -> {
                    return v0.name();
                }).collect(Collectors.toList());
                TestUtils.waitForCondition(() -> {
                    return ((Set) createAuthAdminClient.listTopics().names().get()).containsAll(list2);
                }, "Expected Topics should get created");
                Assertions.assertTrue(saslAuthenticateRequestCallback.callCount.get() > 0);
                loadAllowedNetworkIds(list);
                TestUtils.waitForCondition(() -> {
                    return assertFutureError(createAuthAdminClient.listTopics().names(), SaslAuthenticationException.class);
                }, "Client should be disconnected on disallow network");
                if (createAuthAdminClient != null) {
                    if (0 == 0) {
                        createAuthAdminClient.close();
                        return;
                    }
                    try {
                        createAuthAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createAuthAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAuthAdminClient.close();
                }
            }
            throw th4;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean assertFutureError(Future<?> future, Class<? extends Throwable> cls) throws InterruptedException {
        try {
            future.get();
            return false;
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assertions.assertEquals(cls, cause.getClass(), "Expected a " + cls.getSimpleName() + " exception, but got " + cause.getClass().getSimpleName());
            return true;
        }
    }

    private void assertAuthSuccess(String str) throws Exception {
        SaslAuthenticateRequestCallback saslAuthenticateRequestCallback = new SaslAuthenticateRequestCallback(str);
        AdminClient createAuthAdminClient = createAuthAdminClient(saslAuthenticateRequestCallback);
        Throwable th = null;
        try {
            try {
                createAuthAdminClient.createTopics(this.sampleTopics).all().get();
                List list = (List) this.sampleTopics.stream().map((v0) -> {
                    return v0.name();
                }).collect(Collectors.toList());
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Assertions.assertTrue(((Set) createAuthAdminClient.listTopics().names().get()).containsAll(list));
                });
                Assertions.assertTrue(saslAuthenticateRequestCallback.callCount.get() > 0);
                ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
                Assertions.assertTrue(lastAuthenticationEvent.principal().isPresent());
                Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getPrincipalType());
                Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getName());
                Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEvent.status());
                assertEventNetworkId(str, lastAuthenticationEvent);
                Assertions.assertEquals("1", lastAuthenticationEvent.authenticationContext().server().getAuthorizationID());
                if (createAuthAdminClient != null) {
                    if (0 == 0) {
                        createAuthAdminClient.close();
                        return;
                    }
                    try {
                        createAuthAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createAuthAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAuthAdminClient.close();
                }
            }
            throw th4;
        }
    }

    private void assertEventNetworkId(String str, ConfluentAuthenticationEvent confluentAuthenticationEvent) {
        Assertions.assertEquals(Optional.of(str), this.saslMechanism == SaslMechanism.OAUTHBEARER ? confluentAuthenticationEvent.authenticationContext().server().networkId() : confluentAuthenticationEvent.authenticationContext().server().networkId());
    }

    private ProxyProtocolEngineFactory proxyProtocolEngineFactory() {
        if (!this.useProxyProtocol) {
            return null;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.proxy.protocol.client.address", "1.1.1.1");
        hashMap.put("confluent.proxy.protocol.client.port", 1111);
        hashMap.put("confluent.proxy.protocol.client.mode", ConfluentConfigs.PROXY_PROTOCOL_CLIENT_MODE_DEFAULT);
        if (this.validationMode == TrafficNetworkIdValidationMode.STRICT) {
            hashMap.putAll(pp2ValidateTrafficHeader);
        } else {
            hashMap.putAll(pp2NoValidateTrafficHeader);
        }
        return new ProxyProtocolEngineFactory(ProxyProtocol.V2, hashMap, Mode.CLIENT, new LogContext());
    }

    private void assertAuthFailure(String str) throws Exception {
        String str2 = (this.routesEnabled && this.createNetworkIdTopic) ? "NetworkId: " + str + " isn't allowed to communicate" : "NetworkId: " + str + " validation failed due to an internal error";
        SaslAuthenticateRequestCallback saslAuthenticateRequestCallback = new SaslAuthenticateRequestCallback(str);
        AdminClient createAuthAdminClient = createAuthAdminClient(saslAuthenticateRequestCallback);
        Throwable th = null;
        try {
            TestUtils.assertFutureError(createAuthAdminClient.createTopics(this.sampleTopics).all(), SaslAuthenticationException.class);
            Assertions.assertTrue(saslAuthenticateRequestCallback.callCount.get() > 0);
            ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
            Assertions.assertFalse(lastAuthenticationEvent.principal().isPresent());
            Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, lastAuthenticationEvent.status());
            Assertions.assertTrue(lastAuthenticationEvent.authenticationException().isPresent());
            assertEventNetworkId(str, lastAuthenticationEvent);
            AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEvent.authenticationException().get()).errorInfo();
            Assertions.assertTrue(errorInfo.errorMessage().contains(str2), errorInfo.errorMessage());
            if (createAuthAdminClient != null) {
                if (0 == 0) {
                    createAuthAdminClient.close();
                    return;
                }
                try {
                    createAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAuthAdminClient != null) {
                if (0 != 0) {
                    try {
                        createAuthAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAuthAdminClient.close();
                }
            }
            throw th3;
        }
    }

    private AdminClient createAuthAdminClient(SaslAuthenticateRequestCallback saslAuthenticateRequestCallback) {
        return this.saslMechanism == SaslMechanism.OAUTHBEARER ? this.testHarness.createOAuthAdminClient(IntegrationTestHarness.clientOAuthJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties, saslAuthenticateRequestCallback, proxyProtocolEngineFactory()) : this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), new Properties(), saslAuthenticateRequestCallback, proxyProtocolEngineFactory());
    }
}
