package org.apache.kafka.common.security.authenticator;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import javax.security.sasl.SaslException;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.SaslChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.AbstractLogin;
import org.apache.kafka.common.security.authenticator.TestDigestLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.class */
public class SaslAuthenticatorTest {
    private static final long CONNECTIONS_MAX_REAUTH_MS_VALUE = 100;
    private static final int BUFFER_SIZE = 4096;
    private static Time time = Time.SYSTEM;
    private NioEchoServer server;
    private Selector selector;
    private ChannelBuilder channelBuilder;
    private CertStores serverCertStores;
    private CertStores clientCertStores;
    private Map<String, Object> saslClientConfigs;
    private Map<String, Object> saslServerConfigs;
    private CredentialCache credentialCache;
    private int nextCorrelationId;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest$4, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$security$auth$SecurityProtocol = new int[SecurityProtocol.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$security$auth$SecurityProtocol[SecurityProtocol.SASL_PLAINTEXT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$security$auth$SecurityProtocol[SecurityProtocol.SASL_SSL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest$AlternateLoginCallbackHandler.class */
    public static class AlternateLoginCallbackHandler implements AuthenticateCallbackHandler {
        private static final String QUOTE = "\"";
        private static final OAuthBearerUnsecuredLoginCallbackHandler DELEGATE = new OAuthBearerUnsecuredLoginCallbackHandler();
        private static int numInvocations = 0;

        public void handle(Callback[] callbackArr) throws IOException, UnsupportedCallbackException {
            OAuthBearerTokenCallback oAuthBearerTokenCallback;
            OAuthBearerToken oAuthBearerToken;
            DELEGATE.handle(callbackArr);
            if (callbackArr.length > 0) {
                for (Callback callback : callbackArr) {
                    if ((callback instanceof OAuthBearerTokenCallback) && (oAuthBearerToken = (oAuthBearerTokenCallback = (OAuthBearerTokenCallback) callback).token()) != null) {
                        StringBuilder append = new StringBuilder().append(oAuthBearerToken.principalName());
                        int i = numInvocations + 1;
                        numInvocations = i;
                        String sb = append.append(String.valueOf(i)).toString();
                        String str = "{" + claimOrHeaderJsonText("alg", "none") + "}";
                        try {
                            String format = String.format("{%s,%s,%s}", expClaimText(Long.parseLong("1")), claimOrHeaderJsonText("iat", Double.valueOf(SaslAuthenticatorTest.time.milliseconds() / 1000.0d)), claimOrHeaderJsonText("sub", sb));
                            try {
                                Base64.Encoder withoutPadding = Base64.getUrlEncoder().withoutPadding();
                                oAuthBearerTokenCallback.token(new OAuthBearerUnsecuredJws(String.format("%s.%s.", withoutPadding.encodeToString(str.getBytes(StandardCharsets.UTF_8)), withoutPadding.encodeToString(format.getBytes(StandardCharsets.UTF_8))), "sub", "scope"));
                            } catch (OAuthBearerIllegalTokenException e) {
                                throw new OAuthBearerConfigException(e.getMessage(), e);
                            }
                        } catch (NumberFormatException e2) {
                            throw new OAuthBearerConfigException(e2.getMessage());
                        }
                    }
                }
            }
        }

        private static String claimOrHeaderJsonText(String str, String str2) {
            return QUOTE + str + QUOTE + ":" + QUOTE + str2 + QUOTE;
        }

        private static String claimOrHeaderJsonText(String str, Number number) {
            return QUOTE + str + QUOTE + ":" + number;
        }

        private static String expClaimText(long j) {
            return claimOrHeaderJsonText("exp", Double.valueOf((SaslAuthenticatorTest.time.milliseconds() / 1000.0d) + j));
        }

        public void configure(Map<String, ?> map, String str, List<AppConfigurationEntry> list) {
            DELEGATE.configure(map, str, list);
        }

        public void close() {
            DELEGATE.close();
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest$AlternateSaslChannelBuilder.class */
    private static class AlternateSaslChannelBuilder extends SaslChannelBuilder {
        private int numInvocations;

        public AlternateSaslChannelBuilder(Mode mode, Map<String, JaasContext> map, SecurityProtocol securityProtocol, ListenerName listenerName, boolean z, String str, boolean z2, CredentialCache credentialCache, DelegationTokenCache delegationTokenCache, Time time) {
            super(mode, map, securityProtocol, listenerName, z, str, z2, credentialCache, delegationTokenCache, time);
            this.numInvocations = 0;
        }

        protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> map, AuthenticateCallbackHandler authenticateCallbackHandler, String str, String str2, String str3, TransportLayer transportLayer, Subject subject) {
            int i = this.numInvocations + 1;
            this.numInvocations = i;
            return i == 1 ? new SaslClientAuthenticator(map, authenticateCallbackHandler, str, subject, str3, str2, "DIGEST-MD5", true, transportLayer, SaslAuthenticatorTest.time) : new SaslClientAuthenticator(map, authenticateCallbackHandler, str, subject, str3, str2, "PLAIN", true, transportLayer, SaslAuthenticatorTest.time) { // from class: org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.AlternateSaslChannelBuilder.1
                protected SaslHandshakeRequest createSaslHandshakeRequest(short s) {
                    return new SaslHandshakeRequest.Builder(new SaslHandshakeRequestData().setMechanism("PLAIN")).build(s);
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest$InvalidScramServerCallbackHandler.class */
    public static class InvalidScramServerCallbackHandler implements AuthenticateCallbackHandler {
        static volatile IOException sensitiveException;
        static volatile SaslAuthenticationException clientFriendlyException;

        public void configure(Map<String, ?> map, String str, List<AppConfigurationEntry> list) {
        }

        public void handle(Callback[] callbackArr) throws IOException {
            if (sensitiveException != null) {
                throw sensitiveException;
            }
            if (clientFriendlyException != null) {
                throw clientFriendlyException;
            }
        }

        public void close() {
            reset();
        }

        static void reset() {
            sensitiveException = null;
            clientFriendlyException = null;
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest$TestClientCallbackHandler.class */
    public static class TestClientCallbackHandler implements AuthenticateCallbackHandler {
        static final String USERNAME = "TestClientCallbackHandler-user";
        static final String PASSWORD = "TestClientCallbackHandler-password";
        private volatile boolean configured;

        public void configure(Map<String, ?> map, String str, List<AppConfigurationEntry> list) {
            if (this.configured) {
                throw new IllegalStateException("Client callback handler configured twice");
            }
            this.configured = true;
        }

        public void handle(Callback[] callbackArr) throws UnsupportedCallbackException {
            if (!this.configured) {
                throw new IllegalStateException("Client callback handler not configured");
            }
            for (Callback callback : callbackArr) {
                if (callback instanceof NameCallback) {
                    ((NameCallback) callback).setName(USERNAME);
                } else {
                    if (!(callback instanceof PasswordCallback)) {
                        throw new UnsupportedCallbackException(callback);
                    }
                    ((PasswordCallback) callback).setPassword(PASSWORD.toCharArray());
                }
            }
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest$TestLogin.class */
    public static class TestLogin implements Login {
        static AtomicInteger loginCount = new AtomicInteger();
        private String contextName;
        private Configuration configuration;
        private Subject subject;

        public void configure(Map<String, ?> map, String str, Configuration configuration, AuthenticateCallbackHandler authenticateCallbackHandler) {
            Assert.assertEquals(1L, configuration.getAppConfigurationEntry(str).length);
            this.contextName = str;
            this.configuration = configuration;
        }

        public LoginContext login() throws LoginException {
            LoginContext loginContext = new LoginContext(this.contextName, (Subject) null, new AbstractLogin.DefaultLoginCallbackHandler(), this.configuration);
            loginContext.login();
            this.subject = loginContext.getSubject();
            this.subject.getPublicCredentials().clear();
            this.subject.getPrivateCredentials().clear();
            this.subject.getPublicCredentials().add("myuser");
            this.subject.getPrivateCredentials().add("mypassword");
            loginCount.incrementAndGet();
            return loginContext;
        }

        public Subject subject() {
            return this.subject;
        }

        public String serviceName() {
            return "kafka";
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest$TestLoginCallbackHandler.class */
    public static class TestLoginCallbackHandler implements AuthenticateCallbackHandler {
        private volatile boolean configured = false;

        public void configure(Map<String, ?> map, String str, List<AppConfigurationEntry> list) {
            if (this.configured) {
                throw new IllegalStateException("Login callback handler configured twice");
            }
            this.configured = true;
        }

        public void handle(Callback[] callbackArr) {
            if (!this.configured) {
                throw new IllegalStateException("Login callback handler not configured");
            }
            for (Callback callback : callbackArr) {
                if (callback instanceof NameCallback) {
                    ((NameCallback) callback).setName("myuser");
                } else if (callback instanceof PasswordCallback) {
                    ((PasswordCallback) callback).setPassword("mypassword".toCharArray());
                }
            }
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest$TestPlainLoginModule.class */
    public static final class TestPlainLoginModule extends PlainLoginModule {
        public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> map, Map<String, ?> map2) {
            try {
                Callback nameCallback = new NameCallback("name:");
                PasswordCallback passwordCallback = new PasswordCallback("password:", false);
                callbackHandler.handle(new Callback[]{nameCallback, passwordCallback});
                subject.getPublicCredentials().add(nameCallback.getName());
                subject.getPrivateCredentials().add(new String(passwordCallback.getPassword()));
            } catch (Exception e) {
                throw new SaslAuthenticationException("Login initialization failed", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest$TestServerCallbackHandler.class */
    public static class TestServerCallbackHandler extends PlainServerCallbackHandler {
        static final String USERNAME = "TestServerCallbackHandler-user";
        static final String PASSWORD = "TestServerCallbackHandler-password";
        private volatile boolean configured;

        public void configure(Map<String, ?> map, String str, List<AppConfigurationEntry> list) {
            if (this.configured) {
                throw new IllegalStateException("Server callback handler configured twice");
            }
            this.configured = true;
            super.configure(map, str, list);
        }

        protected boolean authenticate(String str, char[] cArr) {
            if (this.configured) {
                return USERNAME.equals(str) && new String(cArr).equals(PASSWORD);
            }
            throw new IllegalStateException("Server callback handler not configured");
        }
    }

    @Before
    public void setup() throws Exception {
        LoginManager.closeAll();
        time = Time.SYSTEM;
        this.serverCertStores = new CertStores(true, "localhost");
        this.clientCertStores = new CertStores(false, "localhost");
        this.saslServerConfigs = this.serverCertStores.getTrustingConfig(this.clientCertStores);
        this.saslClientConfigs = this.clientCertStores.getTrustingConfig(this.serverCertStores);
        this.credentialCache = new CredentialCache();
        TestLogin.loginCount.set(0);
    }

    @After
    public void teardown() throws Exception {
        if (this.server != null) {
            this.server.close();
        }
        if (this.selector != null) {
            this.selector.close();
        }
    }

    @Test
    public void testValidSaslPlainOverSsl() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        checkAuthenticationAndReauthentication(securityProtocol, "0");
    }

    @Test
    public void testValidSaslPlainOverPlaintext() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        checkAuthenticationAndReauthentication(securityProtocol, "0");
    }

    @Test
    public void testInvalidPasswordSaslPlain() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN")).setClientOptions("PLAIN", "myuser", "invalidpassword");
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientAuthenticationFailure(securityProtocol, "0", "PLAIN", "Authentication failed: Invalid username or password");
        this.server.verifyAuthenticationMetrics(0, 1);
        this.server.verifyReauthenticationMetrics(0, 0);
    }

    @Test
    public void testInvalidUsernameSaslPlain() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN")).setClientOptions("PLAIN", "invaliduser", "mypassword");
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientAuthenticationFailure(securityProtocol, "0", "PLAIN", "Authentication failed: Invalid username or password");
        this.server.verifyAuthenticationMetrics(0, 1);
        this.server.verifyReauthenticationMetrics(0, 0);
    }

    @Test
    public void testMissingUsernameSaslPlain() throws Exception {
        configureMechanisms("PLAIN", Arrays.asList("PLAIN")).setClientOptions("PLAIN", null, "mypassword");
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.server = createEchoServer(securityProtocol);
        createSelector(securityProtocol, this.saslClientConfigs);
        try {
            this.selector.connect("0", new InetSocketAddress("127.0.0.1", this.server.port()), BUFFER_SIZE, BUFFER_SIZE);
            Assert.fail("SASL/PLAIN channel created without username");
        } catch (IOException e) {
            Assert.assertTrue("Channels not closed", this.selector.channels().isEmpty());
            Iterator it = this.selector.keys().iterator();
            while (it.hasNext()) {
                Assert.assertFalse("Key not cancelled", ((SelectionKey) it.next()).isValid());
            }
        }
    }

    @Test
    public void testMissingPasswordSaslPlain() throws Exception {
        configureMechanisms("PLAIN", Arrays.asList("PLAIN")).setClientOptions("PLAIN", "myuser", null);
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.server = createEchoServer(securityProtocol);
        createSelector(securityProtocol, this.saslClientConfigs);
        try {
            this.selector.connect("0", new InetSocketAddress("127.0.0.1", this.server.port()), BUFFER_SIZE, BUFFER_SIZE);
            Assert.fail("SASL/PLAIN channel created without password");
        } catch (IOException e) {
        }
    }

    @Test
    public void testClientExceptionDoesNotContainSensitiveData() throws Exception {
        InvalidScramServerCallbackHandler.reset();
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("SCRAM-SHA-256", Collections.singletonList("SCRAM-SHA-256")).createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), new HashMap());
        this.saslServerConfigs.put(ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix("SCRAM-SHA-256") + "sasl.server.callback.handler.class", InvalidScramServerCallbackHandler.class.getName());
        this.server = createEchoServer(securityProtocol);
        try {
            InvalidScramServerCallbackHandler.sensitiveException = new IOException("Could not connect to password database locahost:8000");
            createAndCheckClientAuthenticationFailure(securityProtocol, "1", "SCRAM-SHA-256", null);
            InvalidScramServerCallbackHandler.sensitiveException = new SaslException("Password for existing user TestServerCallbackHandler-user is invalid");
            createAndCheckClientAuthenticationFailure(securityProtocol, "1", "SCRAM-SHA-256", null);
            InvalidScramServerCallbackHandler.reset();
            InvalidScramServerCallbackHandler.clientFriendlyException = new SaslAuthenticationException("Credential verification failed");
            createAndCheckClientAuthenticationFailure(securityProtocol, "1", "SCRAM-SHA-256", InvalidScramServerCallbackHandler.clientFriendlyException.getMessage());
        } finally {
            InvalidScramServerCallbackHandler.reset();
        }
    }

    @Test
    public void testMechanismPluggability() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5"));
        configureDigestMd5ServerCallback(securityProtocol);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnection(securityProtocol, "0");
    }

    @Test
    public void testMultipleServerMechanisms() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN", "SCRAM-SHA-256"));
        configureDigestMd5ServerCallback(securityProtocol);
        this.server = createEchoServer(securityProtocol);
        updateScramCredentialCache("myuser", "mypassword");
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        createAndCheckClientConnection(securityProtocol, "1");
        this.server.verifyAuthenticationMetrics(1, 0);
        Selector selector = null;
        Selector selector2 = null;
        try {
            this.saslClientConfigs.put("sasl.mechanism", "DIGEST-MD5");
            createSelector(securityProtocol, this.saslClientConfigs);
            selector = this.selector;
            this.selector.connect("2", new InetSocketAddress("127.0.0.1", this.server.port()), BUFFER_SIZE, BUFFER_SIZE);
            NetworkTestUtils.checkClientConnection(this.selector, "2", 100, 10);
            this.selector = null;
            this.server.verifyAuthenticationMetrics(2, 0);
            this.saslClientConfigs.put("sasl.mechanism", "SCRAM-SHA-256");
            createSelector(securityProtocol, this.saslClientConfigs);
            selector2 = this.selector;
            this.selector.connect("3", new InetSocketAddress("127.0.0.1", this.server.port()), BUFFER_SIZE, BUFFER_SIZE);
            NetworkTestUtils.checkClientConnection(this.selector, "3", 100, 10);
            this.server.verifyAuthenticationMetrics(3, 0);
            delay(110L);
            this.server.verifyReauthenticationMetrics(0, 0);
            NetworkTestUtils.checkClientConnection(selector, "2", 100, 10);
            this.server.verifyReauthenticationMetrics(1, 0);
            NetworkTestUtils.checkClientConnection(selector2, "3", 100, 10);
            this.server.verifyReauthenticationMetrics(2, 0);
            if (selector != null) {
                selector.close();
            }
            if (selector2 != null) {
                selector2.close();
            }
        } catch (Throwable th) {
            if (selector != null) {
                selector.close();
            }
            if (selector2 != null) {
                selector2.close();
            }
            throw th;
        }
    }

    @Test
    public void testValidSaslScramSha256() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        this.server = createEchoServer(securityProtocol);
        updateScramCredentialCache("myuser", "mypassword");
        checkAuthenticationAndReauthentication(securityProtocol, "0");
    }

    @Test
    public void testValidSaslScramMechanisms() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("SCRAM-SHA-256", new ArrayList(ScramMechanism.mechanismNames()));
        this.server = createEchoServer(securityProtocol);
        updateScramCredentialCache("myuser", "mypassword");
        for (String str : ScramMechanism.mechanismNames()) {
            this.saslClientConfigs.put("sasl.mechanism", str);
            createAndCheckClientConnection(securityProtocol, "node-" + str);
        }
    }

    @Test
    public void testInvalidPasswordSaslScram() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig configureMechanisms = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap hashMap = new HashMap();
        hashMap.put("username", "myuser");
        hashMap.put("password", "invalidpassword");
        configureMechanisms.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), hashMap);
        this.server = createEchoServer(securityProtocol);
        updateScramCredentialCache("myuser", "mypassword");
        createAndCheckClientAuthenticationFailure(securityProtocol, "0", "SCRAM-SHA-256", null);
        this.server.verifyAuthenticationMetrics(0, 1);
        this.server.verifyReauthenticationMetrics(0, 0);
    }

    @Test
    public void testUnknownUserSaslScram() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig configureMechanisms = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap hashMap = new HashMap();
        hashMap.put("username", "unknownUser");
        hashMap.put("password", "mypassword");
        configureMechanisms.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), hashMap);
        this.server = createEchoServer(securityProtocol);
        updateScramCredentialCache("myuser", "mypassword");
        createAndCheckClientAuthenticationFailure(securityProtocol, "0", "SCRAM-SHA-256", null);
        this.server.verifyAuthenticationMetrics(0, 1);
        this.server.verifyReauthenticationMetrics(0, 0);
    }

    @Test
    public void testUserCredentialsUnavailableForScramMechanism() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("SCRAM-SHA-256", new ArrayList(ScramMechanism.mechanismNames()));
        this.server = createEchoServer(securityProtocol);
        updateScramCredentialCache("myuser", "mypassword");
        this.server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove("myuser");
        this.saslClientConfigs.put("sasl.mechanism", "SCRAM-SHA-256");
        createAndCheckClientAuthenticationFailure(securityProtocol, "1", "SCRAM-SHA-256", null);
        this.server.verifyAuthenticationMetrics(0, 1);
        this.saslClientConfigs.put("sasl.mechanism", "SCRAM-SHA-512");
        createAndCheckClientConnection(securityProtocol, "2");
        this.server.verifyAuthenticationMetrics(1, 1);
        this.server.verifyReauthenticationMetrics(0, 0);
    }

    @Test
    public void testScramUsernameWithSpecialCharacters() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        String str = "special user= test,scram-password";
        TestJaasConfig configureMechanisms = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap hashMap = new HashMap();
        hashMap.put("username", "special user= test,scram");
        hashMap.put("password", str);
        configureMechanisms.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), hashMap);
        this.server = createEchoServer(securityProtocol);
        updateScramCredentialCache("special user= test,scram", str);
        createAndCheckClientConnection(securityProtocol, "0");
    }

    @Test
    public void testTokenAuthenticationOverSaslScram() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig configureMechanisms = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap hashMap = new HashMap();
        hashMap.put("username", "token1");
        hashMap.put("password", "abcdefghijkl");
        hashMap.put("tokenauth", "true");
        configureMechanisms.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), hashMap);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, "0");
        this.server.verifyAuthenticationMetrics(0, 1);
        this.server.tokenCache().addToken("token1", new TokenInformation("token1", SecurityUtils.parseKafkaPrincipal("User:Owner"), Collections.singleton(SecurityUtils.parseKafkaPrincipal("User:Renewer1")), System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis()));
        createAndCheckClientConnectionFailure(securityProtocol, "0");
        this.server.verifyAuthenticationMetrics(0, 2);
        updateTokenCredentialCache("token1", "abcdefghijkl");
        createAndCheckClientConnection(securityProtocol, "0");
        this.server.verifyAuthenticationMetrics(1, 2);
        this.server.verifyReauthenticationMetrics(0, 0);
    }

    @Test
    public void testTokenReauthenticationOverSaslScram() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig configureMechanisms = configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap hashMap = new HashMap();
        hashMap.put("username", "token1");
        hashMap.put("password", "abcdefghijkl");
        hashMap.put("tokenauth", "true");
        configureMechanisms.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), hashMap);
        this.saslServerConfigs.put("connections.max.reauth.ms", Long.MAX_VALUE);
        final Function function = num -> {
            return Long.valueOf(10 * num.intValue() * 100);
        };
        this.server = createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol, new DelegationTokenCache(ScramMechanism.mechanismNames()) { // from class: org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.1
            int callNum = 0;

            public TokenInformation token(String str) {
                TokenInformation tokenInformation = super.token(str);
                long currentTimeMillis = System.currentTimeMillis();
                Function function2 = function;
                int i = this.callNum + 1;
                this.callNum = i;
                long longValue = currentTimeMillis + ((Long) function2.apply(Integer.valueOf(i))).longValue();
                return new TokenInformation(tokenInformation.tokenId(), tokenInformation.owner(), tokenInformation.renewers(), tokenInformation.issueTimestamp(), longValue, longValue);
            }
        });
        this.server.tokenCache().addToken("token1", new TokenInformation("token1", SecurityUtils.parseKafkaPrincipal("User:Owner"), Collections.singleton(SecurityUtils.parseKafkaPrincipal("User:Renewer1")), System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis()));
        updateTokenCredentialCache("token1", "abcdefghijkl");
        createClientConnection(securityProtocol, "0");
        checkClientConnection("0");
        this.server.verifyAuthenticationMetrics(1, 0);
        this.server.verifyReauthenticationMetrics(0, 0);
        delay(((Long) function.apply(1)).longValue());
        checkClientConnection("0");
        this.server.verifyReauthenticationMetrics(1, 0);
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion0() throws Exception {
        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 0);
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion1() throws Exception {
        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 1);
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0() throws Exception {
        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL, (short) 0);
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion1() throws Exception {
        testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL, (short) 1);
    }

    @Test
    public void testApiVersionsRequestWithUnsupportedVersion() throws Exception {
        short latestVersion = ApiKeys.SASL_HANDSHAKE.latestVersion();
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        createClientConnection(SecurityProtocol.PLAINTEXT, "1");
        this.selector.send(new ApiVersionsRequest.Builder().build().toSend("1", new RequestHeader(new RequestHeaderData().setRequestApiKey(ApiKeys.API_VERSIONS.id).setRequestApiVersion(Short.MAX_VALUE).setClientId("someclient").setCorrelationId(1), (short) 2)));
        ByteBuffer waitForResponse = waitForResponse();
        ResponseHeader.parse(waitForResponse, ApiKeys.API_VERSIONS.responseHeaderVersion((short) 0));
        ApiVersionsResponse parse = ApiVersionsResponse.parse(waitForResponse, (short) 0);
        Assert.assertEquals(Errors.UNSUPPORTED_VERSION.code(), parse.data.errorCode());
        Assert.assertNotNull(parse.data.apiKeys().find(ApiKeys.API_VERSIONS.id));
        Assert.assertEquals(ApiKeys.API_VERSIONS.id, r0.apiKey());
        Assert.assertEquals(ApiKeys.API_VERSIONS.oldestVersion(), r0.minVersion());
        Assert.assertEquals(ApiKeys.API_VERSIONS.latestVersion(), r0.maxVersion());
        sendVersionRequestReceiveResponse("1");
        sendHandshakeRequestReceiveResponse("1", latestVersion);
        authenticateUsingSaslPlainAndCheckConnection("1", latestVersion > 0);
    }

    @Test
    public void testInvalidApiVersionsRequest() throws Exception {
        short latestVersion = ApiKeys.SASL_HANDSHAKE.latestVersion();
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        short latestVersion2 = ApiKeys.API_VERSIONS.latestVersion();
        createClientConnection(SecurityProtocol.PLAINTEXT, "1");
        this.selector.send(new ApiVersionsRequest(new ApiVersionsRequestData().setClientSoftwareName("  ").setClientSoftwareVersion("   "), latestVersion2).toSend("1", new RequestHeader(ApiKeys.API_VERSIONS, latestVersion2, "someclient", 1)));
        ResponseHeader.parse(waitForResponse(), ApiKeys.API_VERSIONS.responseHeaderVersion(latestVersion2));
        Assert.assertEquals(Errors.INVALID_REQUEST.code(), ApiVersionsResponse.parse(r0, latestVersion2).data.errorCode());
        sendVersionRequestReceiveResponse("1");
        sendHandshakeRequestReceiveResponse("1", latestVersion);
        authenticateUsingSaslPlainAndCheckConnection("1", latestVersion > 0);
    }

    @Test
    public void testValidApiVersionsRequest() throws Exception {
        short latestVersion = ApiKeys.SASL_HANDSHAKE.latestVersion();
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        short latestVersion2 = ApiKeys.API_VERSIONS.latestVersion();
        createClientConnection(SecurityProtocol.PLAINTEXT, "1");
        this.selector.send(new ApiVersionsRequest.Builder().build(latestVersion2).toSend("1", new RequestHeader(ApiKeys.API_VERSIONS, latestVersion2, "someclient", 1)));
        ResponseHeader.parse(waitForResponse(), ApiKeys.API_VERSIONS.responseHeaderVersion(latestVersion2));
        Assert.assertEquals(Errors.NONE.code(), ApiVersionsResponse.parse(r0, latestVersion2).data.errorCode());
        sendHandshakeRequestReceiveResponse("1", latestVersion);
        authenticateUsingSaslPlainAndCheckConnection("1", latestVersion > 0);
    }

    @Test
    public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        createClientConnection(SecurityProtocol.PLAINTEXT, "invalid1");
        this.selector.send(buildSaslHandshakeRequest("PLAIN", ApiKeys.SASL_HANDSHAKE.latestVersion()).toSend("invalid1", new RequestHeader(ApiKeys.SASL_HANDSHAKE, Short.MAX_VALUE, "someclient", 2)));
        NetworkTestUtils.waitForChannelClose(this.selector, "invalid1", ChannelState.READY.state());
        this.selector.close();
        createAndCheckClientConnection(securityProtocol, "good1");
    }

    @Test
    public void testInvalidSaslPacket() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        createClientConnection(SecurityProtocol.PLAINTEXT, "invalid1");
        sendHandshakeRequestReceiveResponse("invalid1", (short) 1);
        Random random = new Random();
        byte[] bArr = new byte[1024];
        random.nextBytes(bArr);
        this.selector.send(new NetworkSend("invalid1", ByteBuffer.wrap(bArr)));
        NetworkTestUtils.waitForChannelClose(this.selector, "invalid1", ChannelState.READY.state());
        this.selector.close();
        createAndCheckClientConnection(securityProtocol, "good1");
        createClientConnection(SecurityProtocol.PLAINTEXT, "invalid2");
        random.nextBytes(bArr);
        this.selector.send(new NetworkSend("invalid2", ByteBuffer.wrap(bArr)));
        NetworkTestUtils.waitForChannelClose(this.selector, "invalid2", ChannelState.READY.state());
        this.selector.close();
        createAndCheckClientConnection(securityProtocol, "good2");
    }

    @Test
    public void testInvalidApiVersionsRequestSequence() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        createClientConnection(SecurityProtocol.PLAINTEXT, "invalid1");
        sendHandshakeRequestReceiveResponse("invalid1", (short) 1);
        ApiVersionsRequest createApiVersionsRequestV0 = createApiVersionsRequestV0();
        this.selector.send(createApiVersionsRequestV0.toSend("invalid1", new RequestHeader(ApiKeys.API_VERSIONS, createApiVersionsRequestV0.version(), "someclient", 2)));
        NetworkTestUtils.waitForChannelClose(this.selector, "invalid1", ChannelState.READY.state());
        this.selector.close();
        createAndCheckClientConnection(securityProtocol, "good1");
    }

    @Test
    public void testPacketSizeTooBig() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        createClientConnection(SecurityProtocol.PLAINTEXT, "invalid1");
        sendHandshakeRequestReceiveResponse("invalid1", (short) 1);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        allocate.putInt(Integer.MAX_VALUE);
        allocate.put(new byte[allocate.capacity() - 4]);
        allocate.rewind();
        this.selector.send(new NetworkSend("invalid1", allocate));
        NetworkTestUtils.waitForChannelClose(this.selector, "invalid1", ChannelState.READY.state());
        this.selector.close();
        createAndCheckClientConnection(securityProtocol, "good1");
        createClientConnection(SecurityProtocol.PLAINTEXT, "invalid2");
        allocate.clear();
        allocate.putInt(Integer.MAX_VALUE);
        allocate.put(new byte[allocate.capacity() - 4]);
        allocate.rewind();
        this.selector.send(new NetworkSend("invalid2", allocate));
        NetworkTestUtils.waitForChannelClose(this.selector, "invalid2", ChannelState.READY.state());
        this.selector.close();
        createAndCheckClientConnection(securityProtocol, "good2");
    }

    @Test
    public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        createClientConnection(SecurityProtocol.PLAINTEXT, "invalid1");
        MetadataRequest build = new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
        this.selector.send(build.toSend("invalid1", new RequestHeader(ApiKeys.METADATA, build.version(), "someclient", 1)));
        NetworkTestUtils.waitForChannelClose(this.selector, "invalid1", ChannelState.READY.state());
        this.selector.close();
        createAndCheckClientConnection(securityProtocol, "good1");
        createClientConnection(SecurityProtocol.PLAINTEXT, "invalid2");
        sendHandshakeRequestReceiveResponse("invalid2", (short) 1);
        MetadataRequest build2 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
        this.selector.send(build2.toSend("invalid2", new RequestHeader(ApiKeys.METADATA, build2.version(), "someclient", 2)));
        NetworkTestUtils.waitForChannelClose(this.selector, "invalid2", ChannelState.READY.state());
        this.selector.close();
        createAndCheckClientConnection(securityProtocol, "good2");
    }

    @Test
    public void testInvalidLoginModule() throws Exception {
        configureMechanisms("PLAIN", Arrays.asList("PLAIN")).createOrUpdateEntry("KafkaClient", "InvalidLoginModule", TestJaasConfig.defaultClientOptions());
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.server = createEchoServer(securityProtocol);
        try {
            createSelector(securityProtocol, this.saslClientConfigs);
            Assert.fail("SASL/PLAIN channel created without valid login module");
        } catch (KafkaException e) {
        }
    }

    @Test
    public void testClientAuthenticateCallbackHandler() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        TestJaasConfig configureMechanisms = configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
        this.saslClientConfigs.put("sasl.client.callback.handler.class", TestClientCallbackHandler.class.getName());
        configureMechanisms.setClientOptions("PLAIN", "", "");
        HashMap hashMap = new HashMap();
        hashMap.put("user_TestClientCallbackHandler-user", "TestClientCallbackHandler-password");
        configureMechanisms.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), hashMap);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnection(securityProtocol, "good");
        hashMap.clear();
        hashMap.put("user_TestClientCallbackHandler-user", "invalid-password");
        configureMechanisms.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), hashMap);
        createAndCheckClientConnectionFailure(securityProtocol, "invalid");
    }

    @Test
    public void testServerAuthenticateCallbackHandler() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        TestJaasConfig configureMechanisms = configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
        configureMechanisms.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), new HashMap());
        this.saslServerConfigs.put(ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix("PLAIN") + "sasl.server.callback.handler.class", TestServerCallbackHandler.class.getName());
        this.server = createEchoServer(securityProtocol);
        configureMechanisms.setClientOptions("PLAIN", "TestServerCallbackHandler-user", "TestServerCallbackHandler-password");
        createAndCheckClientConnection(securityProtocol, "good");
        configureMechanisms.setClientOptions("PLAIN", "myuser", "invalid-password");
        createAndCheckClientConnectionFailure(securityProtocol, "invalid");
    }

    @Test
    public void testAuthenticateCallbackHandlerMechanisms() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        TestJaasConfig configureMechanisms = configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN"));
        this.saslServerConfigs.put("plain.sasl.server.callback.handler.class", TestServerCallbackHandler.class);
        this.saslServerConfigs.put("digest-md5.sasl.server.callback.handler.class", TestDigestLoginModule.DigestServerCallbackHandler.class);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, "invalid");
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        this.saslServerConfigs.remove("plain.sasl.server.callback.handler.class");
        this.saslServerConfigs.remove("digest-md5.sasl.server.callback.handler.class");
        this.saslServerConfigs.put(forSecurityProtocol.saslMechanismConfigPrefix("plain") + "sasl.server.callback.handler.class", TestServerCallbackHandler.class);
        this.saslServerConfigs.put(forSecurityProtocol.saslMechanismConfigPrefix("digest-md5") + "sasl.server.callback.handler.class", TestDigestLoginModule.DigestServerCallbackHandler.class);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnection(securityProtocol, "good-digest-md5");
        configureMechanisms.setClientOptions("PLAIN", "TestServerCallbackHandler-user", "TestServerCallbackHandler-password");
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        createAndCheckClientConnection(securityProtocol, "good-plain");
    }

    @Test
    public void testClientLoginOverride() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Collections.singletonList("PLAIN")).setClientOptions("PLAIN", "invaliduser", "invalidpassword");
        this.server = createEchoServer(securityProtocol);
        this.saslClientConfigs.put("sasl.login.class", TestLogin.class.getName());
        createAndCheckClientConnection(securityProtocol, "1");
        Assert.assertEquals(1L, TestLogin.loginCount.get());
        this.saslClientConfigs.remove("sasl.login.class");
        createAndCheckClientConnectionFailure(securityProtocol, "invalid");
        Assert.assertEquals(1L, TestLogin.loginCount.get());
    }

    @Test
    public void testServerLoginOverride() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
        this.saslServerConfigs.put(ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix("PLAIN") + "sasl.login.class", TestLogin.class.getName());
        this.server = createEchoServer(securityProtocol);
        Assert.assertEquals(1L, TestLogin.loginCount.get());
        createAndCheckClientConnection(securityProtocol, "1");
        Assert.assertEquals(1L, TestLogin.loginCount.get());
    }

    @Test
    public void testClientLoginCallbackOverride() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        configureMechanisms("PLAIN", Collections.singletonList("PLAIN")).createOrUpdateEntry("KafkaClient", TestPlainLoginModule.class.getName(), Collections.emptyMap());
        this.server = createEchoServer(securityProtocol);
        this.saslClientConfigs.put("sasl.login.callback.handler.class", TestLoginCallbackHandler.class.getName());
        createAndCheckClientConnection(securityProtocol, "1");
        this.saslClientConfigs.remove("sasl.login.callback.handler.class");
        try {
            createClientConnection(securityProtocol, "invalid");
        } catch (Exception e) {
            Assert.assertTrue("Unexpected exception " + e.getCause(), e.getCause() instanceof LoginException);
        }
    }

    @Test
    public void testServerLoginCallbackOverride() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        TestJaasConfig configureMechanisms = configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
        configureMechanisms.createOrUpdateEntry("KafkaServer", TestPlainLoginModule.class.getName(), Collections.emptyMap());
        configureMechanisms.setClientOptions("PLAIN", "TestServerCallbackHandler-user", "TestServerCallbackHandler-password");
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        String saslMechanismConfigPrefix = forSecurityProtocol.saslMechanismConfigPrefix("PLAIN");
        this.saslServerConfigs.put(saslMechanismConfigPrefix + "sasl.server.callback.handler.class", TestServerCallbackHandler.class);
        try {
            createEchoServer(securityProtocol);
            Assert.fail("Should have failed to create server with default login handler");
        } catch (KafkaException e) {
        }
        try {
            this.saslServerConfigs.put("sasl.login.callback.handler.class", TestLoginCallbackHandler.class);
            createEchoServer(securityProtocol);
            Assert.fail("Should have failed to create server with login handler config without listener+mechanism prefix");
        } catch (KafkaException e2) {
            this.saslServerConfigs.remove("sasl.login.callback.handler.class");
        }
        try {
            this.saslServerConfigs.put("plain.sasl.login.callback.handler.class", TestLoginCallbackHandler.class);
            createEchoServer(securityProtocol);
            Assert.fail("Should have failed to create server with login handler config without listener prefix");
        } catch (KafkaException e3) {
            this.saslServerConfigs.remove("plain.sasl.login.callback.handler.class");
        }
        try {
            this.saslServerConfigs.put(forSecurityProtocol.configPrefix() + "sasl.login.callback.handler.class", TestLoginCallbackHandler.class);
            createEchoServer(securityProtocol);
            Assert.fail("Should have failed to create server with login handler config without mechanism prefix");
        } catch (KafkaException e4) {
            this.saslServerConfigs.remove("plain.sasl.login.callback.handler.class");
        }
        this.saslServerConfigs.put(saslMechanismConfigPrefix + "sasl.login.callback.handler.class", TestLoginCallbackHandler.class);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnection(securityProtocol, "1");
    }

    @Test
    public void testDisabledMechanism() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5"));
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, "0");
        this.server.verifyAuthenticationMetrics(0, 1);
        this.server.verifyReauthenticationMetrics(0, 0);
    }

    @Test
    public void testInvalidMechanism() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.saslClientConfigs.put("sasl.mechanism", "INVALID");
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, "0");
        this.server.verifyAuthenticationMetrics(0, 1);
        this.server.verifyReauthenticationMetrics(0, 0);
    }

    @Test
    public void testClientDynamicJaasConfiguration() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        this.saslServerConfigs.put("sasl.enabled.mechanisms", Arrays.asList("PLAIN"));
        HashMap hashMap = new HashMap();
        hashMap.put("user_user1", "user1-secret");
        hashMap.put("user_user2", "user2-secret");
        TestJaasConfig testJaasConfig = new TestJaasConfig();
        testJaasConfig.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), hashMap);
        testJaasConfig.setClientOptions("PLAIN", "user1", "invalidpassword");
        Configuration.setConfiguration(testJaasConfig);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, "1");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user1-secret"));
        createAndCheckClientConnection(securityProtocol, "2");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user2-secret"));
        createAndCheckClientConnectionFailure(securityProtocol, "3");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret"));
        createAndCheckClientConnection(securityProtocol, "4");
        this.saslClientConfigs.put("sasl.jaas.config", new Password(TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user1-secret").value() + " " + TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret").value()));
        try {
            createClientConnection(securityProtocol, "1");
            Assert.fail("Connection created with multiple login modules in sasl.jaas.config");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testServerDynamicJaasConfiguration() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        this.saslServerConfigs.put("sasl.enabled.mechanisms", Arrays.asList("PLAIN"));
        HashMap hashMap = new HashMap();
        hashMap.put("user_user1", "user1-secret");
        hashMap.put("user_user2", "user2-secret");
        this.saslServerConfigs.put("listener.name.sasl_ssl.plain.sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", hashMap));
        TestJaasConfig testJaasConfig = new TestJaasConfig();
        testJaasConfig.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), Collections.emptyMap());
        testJaasConfig.setClientOptions("PLAIN", "user1", "user1-secret");
        Configuration.setConfiguration(testJaasConfig);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnection(securityProtocol, "1");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret"));
        createAndCheckClientConnection(securityProtocol, "2");
    }

    @Test
    public void testJaasConfigurationForListener() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        this.saslServerConfigs.put("sasl.enabled.mechanisms", Arrays.asList("PLAIN"));
        TestJaasConfig testJaasConfig = new TestJaasConfig();
        HashMap hashMap = new HashMap();
        hashMap.put("user_global1", "gsecret1");
        hashMap.put("user_global2", "gsecret2");
        testJaasConfig.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("user_client1", "csecret1");
        hashMap2.put("user_client2", "csecret2");
        testJaasConfig.createOrUpdateEntry("client.KafkaServer", PlainLoginModule.class.getName(), hashMap2);
        Configuration.setConfiguration(testJaasConfig);
        this.server = createEchoServer(new ListenerName("client"), securityProtocol);
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
        createAndCheckClientConnection(securityProtocol, "1");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
        createAndCheckClientConnectionFailure(securityProtocol, "2");
        this.server.close();
        this.server = createEchoServer(new ListenerName("other"), securityProtocol);
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
        createAndCheckClientConnection(securityProtocol, "3");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
        createAndCheckClientConnectionFailure(securityProtocol, "4");
    }

    @Test
    public void oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeader() throws Exception {
        verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
    }

    @Test
    public void oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeader() throws Exception {
        verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
    }

    @Test
    public void oldSaslScramPlaintextServerWithoutSaslAuthenticateHeader() throws Exception {
        verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
    }

    @Test
    public void oldSaslScramPlaintextClientWithoutSaslAuthenticateHeader() throws Exception {
        verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
    }

    @Test
    public void oldSaslPlainSslServerWithoutSaslAuthenticateHeader() throws Exception {
        verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_SSL, "PLAIN");
    }

    @Test
    public void oldSaslPlainSslClientWithoutSaslAuthenticateHeader() throws Exception {
        verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_SSL, "PLAIN");
    }

    @Test
    public void oldSaslScramSslServerWithoutSaslAuthenticateHeader() throws Exception {
        verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
    }

    @Test
    public void oldSaslScramSslClientWithoutSaslAuthenticateHeader() throws Exception {
        verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
    }

    @Test
    public void oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws Exception {
        verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
    }

    @Test
    public void oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws Exception {
        verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
    }

    @Test
    public void oldSaslScramPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws Exception {
        verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
    }

    @Test
    public void oldSaslScramPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws Exception {
        verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
    }

    @Test
    public void oldSaslPlainSslServerWithoutSaslAuthenticateHeaderFailure() throws Exception {
        verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_SSL, "PLAIN");
    }

    @Test
    public void oldSaslPlainSslClientWithoutSaslAuthenticateHeaderFailure() throws Exception {
        verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_SSL, "PLAIN");
    }

    @Test
    public void oldSaslScramSslServerWithoutSaslAuthenticateHeaderFailure() throws Exception {
        verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
    }

    @Test
    public void oldSaslScramSslClientWithoutSaslAuthenticateHeaderFailure() throws Exception {
        verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
    }

    @Test
    public void testValidSaslOauthBearerMechanism() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("OAUTHBEARER", Arrays.asList("OAUTHBEARER"));
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnection(securityProtocol, "0");
    }

    @Test
    public void testCannotReauthenticateWithDifferentPrincipal() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.saslClientConfigs.put("sasl.login.callback.handler.class", AlternateLoginCallbackHandler.class.getName());
        configureMechanisms("OAUTHBEARER", Arrays.asList("OAUTHBEARER"));
        this.server = createEchoServer(securityProtocol);
        createClientConnection(securityProtocol, "0");
        checkClientConnection("0");
        this.server.verifyAuthenticationMetrics(1, 0);
        this.server.verifyReauthenticationMetrics(0, 0);
        delay(1000L);
        try {
            checkClientConnection("0");
            Assert.fail("Re-authentication with a different principal should have failed but did not");
        } catch (AssertionError e) {
            this.server.verifyReauthenticationMetrics(0, 1);
        }
    }

    @Test
    public void testCannotReauthenticateWithDifferentMechanism() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN"));
        configureDigestMd5ServerCallback(securityProtocol);
        this.server = createEchoServer(securityProtocol);
        String str = (String) this.saslClientConfigs.get("sasl.mechanism");
        Map values = new TestSecurityConfig(this.saslClientConfigs).values();
        this.channelBuilder = new AlternateSaslChannelBuilder(Mode.CLIENT, Collections.singletonMap(str, JaasContext.loadClientContext(values)), securityProtocol, null, false, str, true, this.credentialCache, null, time);
        this.channelBuilder.configure(values);
        this.selector = NetworkTestUtils.createSelector(this.channelBuilder, time);
        this.selector.connect("0", new InetSocketAddress("localhost", this.server.port()), BUFFER_SIZE, BUFFER_SIZE);
        checkClientConnection("0");
        this.server.verifyAuthenticationMetrics(1, 0);
        this.server.verifyReauthenticationMetrics(0, 0);
        delay(110L);
        try {
            checkClientConnection("0");
            Assert.fail("Re-authentication with a different mechanism should have failed but did not");
        } catch (AssertionError e) {
            this.server.verifyAuthenticationMetrics(1, 0);
            this.server.verifyReauthenticationMetrics(0, 1);
        }
    }

    @Test
    public void testCannotReauthenticateAgainFasterThanOneSecond() throws Exception {
        time = new MockTime();
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("OAUTHBEARER", Arrays.asList("OAUTHBEARER"));
        this.server = createEchoServer(securityProtocol);
        try {
            try {
                createClientConnection(securityProtocol, "0");
                checkClientConnection("0");
                this.server.verifyAuthenticationMetrics(1, 0);
                this.server.verifyReauthenticationMetrics(0, 0);
                time.sleep(110L);
                checkClientConnection("0");
                this.server.verifyAuthenticationMetrics(1, 0);
                this.server.verifyReauthenticationMetrics(1, 0);
                time.sleep(110L);
                NetworkTestUtils.checkClientConnection(this.selector, "0", 1, 1);
                Assert.fail("Expected a failure when trying to re-authenticate to quickly, but that did not occur");
                this.selector.close();
                this.selector = null;
            } catch (AssertionError e) {
                Assert.assertTrue("Should have received the SaslHandshakeRequest bytes back since we re-authenticated too quickly, but instead we got our generated message echoed back, implying re-auth succeeded when it should not have: " + e, e.getMessage().matches(".*\\<\\[" + ("\\w-0") + "]>.*\\<\\[.*OAUTHBEARER]>"));
                this.server.verifyReauthenticationMetrics(1, 0);
                this.selector.close();
                this.selector = null;
            }
        } catch (Throwable th) {
            this.selector.close();
            this.selector = null;
            throw th;
        }
    }

    @Test
    public void testRepeatedValidSaslPlainOverSsl() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.saslServerConfigs.put("connections.max.reauth.ms", Long.valueOf(new Double(1294.1176470588236d).longValue()));
        this.server = createEchoServer(securityProtocol);
        createClientConnection(securityProtocol, "0");
        checkClientConnection("0");
        this.server.verifyAuthenticationMetrics(1, 0);
        this.server.verifyReauthenticationMetrics(0, 0);
        double d = 0.0d;
        long milliseconds = Time.SYSTEM.milliseconds() + TestUtils.DEFAULT_MAX_WAIT_MS;
        while (d < 5 && Time.SYSTEM.milliseconds() < milliseconds) {
            checkClientConnection("0");
            d = this.server.metricValue("successful-reauthentication-total");
        }
        this.server.verifyReauthenticationMetrics(5, 0);
    }

    @Test
    public void testValidSaslOauthBearerMechanismWithoutServerTokens() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.saslClientConfigs.put("sasl.mechanism", "OAUTHBEARER");
        this.saslServerConfigs.put("sasl.enabled.mechanisms", Arrays.asList("OAUTHBEARER"));
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.singletonMap("unsecuredLoginStringClaim_sub", "myuser")));
        this.saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer.sasl.jaas.config", TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.emptyMap()));
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnection(securityProtocol, "0");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.emptyMap()));
        createAndCheckClientConnectionFailure(securityProtocol, "0");
        this.saslServerConfigs.put("listener.name.sasl_ssl.oauthbearer.sasl.jaas.config", TestJaasConfig.jaasConfigProperty("OAUTHBEARER", Collections.singletonMap("unsecuredLoginExtension_test", "something")));
        try {
            createEchoServer(securityProtocol);
            Assert.fail("Server created with invalid login config containing extensions without a token");
        } catch (Throwable th) {
            Assert.assertTrue("Unexpected exception " + Utils.stackTrace(th), th.getCause() instanceof LoginException);
        }
    }

    @Test
    public void testInsufficientScopeSaslOauthBearerMechanism() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig configureMechanisms = configureMechanisms("OAUTHBEARER", Arrays.asList("OAUTHBEARER"));
        Map<String, Object> defaultServerOptions = TestJaasConfig.defaultServerOptions("OAUTHBEARER");
        defaultServerOptions.put("unsecuredValidatorRequiredScope", "LOGIN_TO_KAFKA");
        configureMechanisms.createOrUpdateEntry("KafkaServer", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", defaultServerOptions);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientAuthenticationFailure(securityProtocol, "node-OAUTHBEARER", "OAUTHBEARER", "{\"status\":\"insufficient_scope\", \"scope\":\"[LOGIN_TO_KAFKA]\"}");
    }

    private void verifySaslAuthenticateHeaderInterop(boolean z, boolean z2, SecurityProtocol securityProtocol, String str) throws Exception {
        configureMechanisms(str, Arrays.asList(str));
        createServer(securityProtocol, str, z);
        createClientConnection(securityProtocol, str, "0", z2);
        NetworkTestUtils.checkClientConnection(this.selector, "0", 100, 10);
    }

    private void verifySaslAuthenticateHeaderInteropWithFailure(boolean z, boolean z2, SecurityProtocol securityProtocol, String str) throws Exception {
        configureMechanisms(str, Arrays.asList(str)).setClientOptions(str, "myuser", "invalidpassword");
        createServer(securityProtocol, str, z);
        createClientConnection(securityProtocol, str, "0", z2);
        NetworkTestUtils.waitForChannelClose(this.selector, "0", ChannelState.State.AUTHENTICATE);
    }

    private void createServer(SecurityProtocol securityProtocol, String str, boolean z) throws Exception {
        if (z) {
            this.server = createEchoServer(securityProtocol);
        } else {
            this.server = startServerWithoutSaslAuthenticateHeader(securityProtocol, str);
        }
        updateScramCredentialCache("myuser", "mypassword");
    }

    private void createClientConnection(SecurityProtocol securityProtocol, String str, String str2, boolean z) throws Exception {
        if (z) {
            createClientConnection(securityProtocol, str2);
        } else {
            createClientConnectionWithoutSaslAuthenticateHeader(securityProtocol, str, str2);
        }
    }

    private NioEchoServer startServerWithoutSaslAuthenticateHeader(final SecurityProtocol securityProtocol, String str) throws Exception {
        final ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        Map singletonMap = Collections.singletonMap(str, JaasContext.loadServerContext(forSecurityProtocol, str, Collections.emptyMap()));
        if (ScramMechanism.isScram(str)) {
            ScramCredentialUtils.createCache(this.credentialCache, Arrays.asList(str));
        }
        SaslChannelBuilder saslChannelBuilder = new SaslChannelBuilder(Mode.SERVER, singletonMap, securityProtocol, forSecurityProtocol, false, str, true, this.credentialCache, null, time) { // from class: org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.2
            protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> map, Map<String, AuthenticateCallbackHandler> map2, String str2, TransportLayer transportLayer, Map<String, Subject> map3, Map<String, Long> map4) {
                return new SaslServerAuthenticator(map, map2, str2, map3, null, forSecurityProtocol, securityProtocol, transportLayer, map4, SaslAuthenticatorTest.time) { // from class: org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.2.1
                    protected ApiVersionsResponse apiVersionsResponse() {
                        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE;
                        ApiVersionsResponseData.ApiVersionsResponseKeyCollection apiVersionsResponseKeyCollection = new ApiVersionsResponseData.ApiVersionsResponseKeyCollection();
                        Iterator it = apiVersionsResponse.data.apiKeys().iterator();
                        while (it.hasNext()) {
                            ApiVersionsResponseData.ApiVersionsResponseKey apiVersionsResponseKey = (ApiVersionsResponseData.ApiVersionsResponseKey) it.next();
                            if (apiVersionsResponseKey.apiKey() != ApiKeys.SASL_AUTHENTICATE.id) {
                                apiVersionsResponseKeyCollection.add(new ApiVersionsResponseData.ApiVersionsResponseKey().setApiKey(apiVersionsResponseKey.apiKey()).setMinVersion(apiVersionsResponseKey.minVersion()).setMaxVersion(apiVersionsResponseKey.maxVersion()));
                            }
                        }
                        return new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.NONE.code()).setThrottleTimeMs(0).setApiKeys(apiVersionsResponseKeyCollection));
                    }

                    protected void enableKafkaSaslAuthenticateHeaders(boolean z) {
                    }
                };
            }
        };
        saslChannelBuilder.configure(this.saslServerConfigs);
        this.server = new NioEchoServer(forSecurityProtocol, securityProtocol, new TestSecurityConfig(this.saslServerConfigs), "localhost", saslChannelBuilder, this.credentialCache, time);
        this.server.start();
        return this.server;
    }

    private void createClientConnectionWithoutSaslAuthenticateHeader(SecurityProtocol securityProtocol, final String str, String str2) throws Exception {
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        SaslChannelBuilder saslChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, Collections.singletonMap(str, JaasContext.loadClientContext(Collections.emptyMap())), securityProtocol, forSecurityProtocol, false, str, true, null, null, time) { // from class: org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.3
            protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> map, AuthenticateCallbackHandler authenticateCallbackHandler, String str3, String str4, String str5, TransportLayer transportLayer, Subject subject) {
                return new SaslClientAuthenticator(map, authenticateCallbackHandler, str3, subject, str5, str4, str, true, transportLayer, SaslAuthenticatorTest.time) { // from class: org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.3.1
                    protected SaslHandshakeRequest createSaslHandshakeRequest(short s) {
                        return SaslAuthenticatorTest.this.buildSaslHandshakeRequest(str, (short) 0);
                    }

                    protected void saslAuthenticateVersion(ApiVersionsResponse apiVersionsResponse) {
                    }
                };
            }
        };
        saslChannelBuilder.configure(this.saslClientConfigs);
        this.selector = NetworkTestUtils.createSelector(saslChannelBuilder, time);
        this.selector.connect(str2, new InetSocketAddress("127.0.0.1", this.server.port()), BUFFER_SIZE, BUFFER_SIZE);
    }

    private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol, short s) throws Exception {
        SecurityProtocol securityProtocol2;
        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = createEchoServer(securityProtocol);
        switch (AnonymousClass4.$SwitchMap$org$apache$kafka$common$security$auth$SecurityProtocol[securityProtocol.ordinal()]) {
            case 1:
                securityProtocol2 = SecurityProtocol.PLAINTEXT;
                break;
            case 2:
                securityProtocol2 = SecurityProtocol.SSL;
                break;
            default:
                throw new IllegalArgumentException("Server protocol " + securityProtocol + " is not SASL");
        }
        createClientConnection(securityProtocol2, "1");
        NetworkTestUtils.waitForChannelReady(this.selector, "1");
        ApiVersionsResponse sendVersionRequestReceiveResponse = sendVersionRequestReceiveResponse("1");
        Assert.assertEquals(ApiKeys.SASL_HANDSHAKE.oldestVersion(), sendVersionRequestReceiveResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion());
        Assert.assertEquals(ApiKeys.SASL_HANDSHAKE.latestVersion(), sendVersionRequestReceiveResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion());
        Assert.assertEquals(ApiKeys.SASL_AUTHENTICATE.oldestVersion(), sendVersionRequestReceiveResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id).minVersion());
        Assert.assertEquals(ApiKeys.SASL_AUTHENTICATE.latestVersion(), sendVersionRequestReceiveResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id).maxVersion());
        Assert.assertEquals(Collections.singletonList("PLAIN"), sendHandshakeRequestReceiveResponse("1", s).enabledMechanisms());
        authenticateUsingSaslPlainAndCheckConnection("1", s > 0);
    }

    private void authenticateUsingSaslPlainAndCheckConnection(String str, boolean z) throws Exception {
        ByteBuffer wrap = ByteBuffer.wrap("��myuser��mypassword".getBytes("UTF-8"));
        if (z) {
            sendKafkaRequestReceiveResponse(str, ApiKeys.SASL_AUTHENTICATE, new SaslAuthenticateRequest.Builder(new SaslAuthenticateRequestData().setAuthBytes(wrap.array())).build());
        } else {
            this.selector.send(new NetworkSend(str, wrap));
            waitForResponse();
        }
        NetworkTestUtils.checkClientConnection(this.selector, str, 100, 10);
    }

    private TestJaasConfig configureMechanisms(String str, List<String> list) {
        this.saslClientConfigs.put("sasl.mechanism", str);
        this.saslServerConfigs.put("sasl.enabled.mechanisms", list);
        this.saslServerConfigs.put("connections.max.reauth.ms", 100L);
        if (list.contains("DIGEST-MD5")) {
            this.saslServerConfigs.put("digest-md5.sasl.server.callback.handler.class", TestDigestLoginModule.DigestServerCallbackHandler.class.getName());
        }
        return TestJaasConfig.createConfiguration(str, list);
    }

    private void configureDigestMd5ServerCallback(SecurityProtocol securityProtocol) {
        this.saslServerConfigs.put(ListenerName.forSecurityProtocol(securityProtocol).saslMechanismConfigPrefix("DIGEST-MD5") + "sasl.server.callback.handler.class", TestDigestLoginModule.DigestServerCallbackHandler.class);
    }

    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> map) {
        if (this.selector != null) {
            this.selector.close();
            this.selector = null;
        }
        this.channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, new TestSecurityConfig(map), (ListenerName) null, (String) this.saslClientConfigs.get("sasl.mechanism"), time, true);
        this.selector = NetworkTestUtils.createSelector(this.channelBuilder, time);
    }

    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
        return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
    }

    private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(this.saslServerConfigs), this.credentialCache, time);
    }

    private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, DelegationTokenCache delegationTokenCache) throws Exception {
        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(this.saslServerConfigs), this.credentialCache, 100, time, delegationTokenCache);
    }

    private void createClientConnection(SecurityProtocol securityProtocol, String str) throws Exception {
        createSelector(securityProtocol, this.saslClientConfigs);
        this.selector.connect(str, new InetSocketAddress("localhost", this.server.port()), BUFFER_SIZE, BUFFER_SIZE);
    }

    private void checkClientConnection(String str) throws Exception {
        NetworkTestUtils.checkClientConnection(this.selector, str, 100, 10);
    }

    private void closeClientConnectionIfNecessary() throws Exception {
        if (this.selector != null) {
            this.selector.close();
            this.selector = null;
        }
    }

    private void createAndCheckClientConnection(SecurityProtocol securityProtocol, String str) throws Exception {
        try {
            createClientConnection(securityProtocol, str);
            checkClientConnection(str);
        } finally {
            closeClientConnectionIfNecessary();
        }
    }

    private void createAndCheckClientAuthenticationFailure(SecurityProtocol securityProtocol, String str, String str2, String str3) throws Exception {
        AuthenticationException exception = createAndCheckClientConnectionFailure(securityProtocol, str).exception();
        Assert.assertTrue("Invalid exception class " + exception.getClass(), exception instanceof SaslAuthenticationException);
        Assert.assertEquals(str3 != null ? str3 : "Authentication failed during authentication due to invalid credentials with SASL mechanism " + str2, exception.getMessage());
    }

    private ChannelState createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String str) throws Exception {
        try {
            createClientConnection(securityProtocol, str);
            ChannelState waitForChannelClose = NetworkTestUtils.waitForChannelClose(this.selector, str, ChannelState.State.AUTHENTICATION_FAILED);
            closeClientConnectionIfNecessary();
            return waitForChannelClose;
        } catch (Throwable th) {
            closeClientConnectionIfNecessary();
            throw th;
        }
    }

    private void checkAuthenticationAndReauthentication(SecurityProtocol securityProtocol, String str) throws Exception {
        try {
            createClientConnection(securityProtocol, str);
            checkClientConnection(str);
            this.server.verifyAuthenticationMetrics(1, 0);
            delay(110L);
            this.server.verifyReauthenticationMetrics(0, 0);
            checkClientConnection(str);
            this.server.verifyReauthenticationMetrics(1, 0);
        } finally {
            closeClientConnectionIfNecessary();
        }
    }

    private AbstractResponse sendKafkaRequestReceiveResponse(String str, ApiKeys apiKeys, AbstractRequest abstractRequest) throws IOException {
        short version = abstractRequest.version();
        int i = this.nextCorrelationId;
        this.nextCorrelationId = i + 1;
        RequestHeader requestHeader = new RequestHeader(apiKeys, version, "someclient", i);
        this.selector.send(abstractRequest.toSend(str, requestHeader));
        return NetworkClient.parseResponse(waitForResponse(), requestHeader);
    }

    private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String str, short s) throws Exception {
        SaslHandshakeResponse sendKafkaRequestReceiveResponse = sendKafkaRequestReceiveResponse(str, ApiKeys.SASL_HANDSHAKE, buildSaslHandshakeRequest("PLAIN", s));
        Assert.assertEquals(Errors.NONE, sendKafkaRequestReceiveResponse.error());
        return sendKafkaRequestReceiveResponse;
    }

    private ApiVersionsResponse sendVersionRequestReceiveResponse(String str) throws Exception {
        ApiVersionsResponse sendKafkaRequestReceiveResponse = sendKafkaRequestReceiveResponse(str, ApiKeys.API_VERSIONS, createApiVersionsRequestV0());
        Assert.assertEquals(Errors.NONE.code(), sendKafkaRequestReceiveResponse.data.errorCode());
        return sendKafkaRequestReceiveResponse;
    }

    private ByteBuffer waitForResponse() throws IOException {
        int i;
        int i2 = 10;
        do {
            this.selector.poll(1000L);
            if (!this.selector.completedReceives().isEmpty()) {
                break;
            }
            i = i2;
            i2--;
        } while (i > 0);
        Assert.assertEquals(1L, this.selector.completedReceives().size());
        return ((NetworkReceive) this.selector.completedReceives().get(0)).payload();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SaslHandshakeRequest buildSaslHandshakeRequest(String str, short s) {
        return new SaslHandshakeRequest.Builder(new SaslHandshakeRequestData().setMechanism(str)).build(s);
    }

    private void updateScramCredentialCache(String str, String str2) throws NoSuchAlgorithmException {
        Iterator it = ((List) this.saslServerConfigs.get("sasl.enabled.mechanisms")).iterator();
        while (it.hasNext()) {
            ScramMechanism forMechanismName = ScramMechanism.forMechanismName((String) it.next());
            if (forMechanismName != null) {
                this.credentialCache.cache(forMechanismName.mechanismName(), ScramCredential.class).put(str, new ScramFormatter(forMechanismName).generateCredential(str2, BUFFER_SIZE));
            }
        }
    }

    private ApiVersionsRequest createApiVersionsRequestV0() {
        return new ApiVersionsRequest.Builder((short) 0).build();
    }

    private void updateTokenCredentialCache(String str, String str2) throws NoSuchAlgorithmException {
        Iterator it = ((List) this.saslServerConfigs.get("sasl.enabled.mechanisms")).iterator();
        while (it.hasNext()) {
            ScramMechanism forMechanismName = ScramMechanism.forMechanismName((String) it.next());
            if (forMechanismName != null) {
                this.server.tokenCache().credentialCache(forMechanismName.mechanismName()).put(str, new ScramFormatter(forMechanismName).generateCredential(str2, BUFFER_SIZE));
            }
        }
    }

    private static void delay(long j) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < j) {
            Thread.sleep(20L);
        }
    }
}
