/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ByteBufferSend;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelMetadataRegistry;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ListenerReconfigurable;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.PlaintextChannelBuilder;
import org.apache.kafka.common.network.ProxyProtocol;
import org.apache.kafka.common.network.ProxyProtocolEngine;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.network.RequestCallback;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.SslChannelBuilder;
import org.apache.kafka.common.network.SslTransportLayer;
import org.apache.kafka.common.network.TestProxyProtocolEngine;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.DefaultRequestCallbackManager;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.kafka.common.security.ssl.NettySslEngineFactory;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.Java;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.audit.AuditEvent;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.audit.AuthenticationEvent;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;

public class SslTransportLayerTest {
    private static final int BUFFER_SIZE = 4096;
    private static Time time = Time.SYSTEM;
    private NioEchoServer server;
    private Selector selector;

    @AfterEach
    public void teardown() throws Exception {
        if (this.selector != null) {
            this.selector.close();
        }
        if (this.server != null) {
            this.server.close();
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testValidEndpointIdentificationSanDns(Args args) throws Exception {
        this.createSelector(args);
        String node = "0";
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        args.sslClientConfigs.put("ssl.endpoint.identification.algorithm", "HTTPS");
        this.createSelector(args.sslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
        this.server.verifyAuthenticationMetrics(1, 0);
        this.verifyAuditLogEvent(args, true);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testValidEndpointIdentificationSanIp(Args args) throws Exception {
        String node = "0";
        args.serverCertStores = SslTransportLayerTest.certBuilder(true, "server", args.useInlinePem).hostAddress(InetAddress.getByName("127.0.0.1")).build();
        args.clientCertStores = SslTransportLayerTest.certBuilder(false, "client", args.useInlinePem).hostAddress(InetAddress.getByName("127.0.0.1")).build();
        args.sslServerConfigs = args.getTrustingConfig(args.serverCertStores, args.clientCertStores);
        args.sslClientConfigs = args.getTrustingConfig(args.clientCertStores, args.serverCertStores);
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        args.sslClientConfigs.put("ssl.endpoint.identification.algorithm", "HTTPS");
        this.createSelector(args.sslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
        this.verifyAuditLogEvent(args, true);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testValidEndpointIdentificationCN(Args args) throws Exception {
        args.serverCertStores = SslTransportLayerTest.certBuilder(true, "localhost", args.useInlinePem).build();
        args.clientCertStores = SslTransportLayerTest.certBuilder(false, "localhost", args.useInlinePem).build();
        args.sslServerConfigs = args.getTrustingConfig(args.serverCertStores, args.clientCertStores);
        args.sslClientConfigs = args.getTrustingConfig(args.clientCertStores, args.serverCertStores);
        args.sslClientConfigs.put("ssl.endpoint.identification.algorithm", "HTTPS");
        args.auditLogProvider = new TestAuditLogProvider();
        this.verifySslConfigs(args);
        this.verifyAuditLogEvent(args, true);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testEndpointIdentificationNoReverseLookup(Args args) throws Exception {
        String node = "0";
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        args.sslClientConfigs.put("ssl.endpoint.identification.algorithm", "HTTPS");
        this.createSelector(args.sslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.waitForChannelClose(this.selector, node, ChannelState.State.AUTHENTICATION_FAILED);
        this.verifyAuditLogEvent(args, false);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testClientEndpointNotValidated(Args args) throws Exception {
        String node = "0";
        args.clientCertStores = SslTransportLayerTest.certBuilder(false, "non-existent.com", args.useInlinePem).build();
        args.serverCertStores = SslTransportLayerTest.certBuilder(true, "localhost", args.useInlinePem).build();
        args.sslServerConfigs = args.getTrustingConfig(args.serverCertStores, args.clientCertStores);
        args.sslClientConfigs = args.getTrustingConfig(args.clientCertStores, args.serverCertStores);
        TestSslChannelBuilder serverChannelBuilder = new TestSslChannelBuilder(Mode.SERVER){

            @Override
            protected TestSslChannelBuilder.TestSslTransportLayer newTransportLayer(String id, SelectionKey key, SSLEngine sslEngine, Mode mode, ProxyProtocolEngine proxyProtocolEngine) {
                SSLParameters sslParams = sslEngine.getSSLParameters();
                sslParams.setEndpointIdentificationAlgorithm("HTTPS");
                sslEngine.setSSLParameters(sslParams);
                return super.newTransportLayer(id, key, sslEngine, mode, proxyProtocolEngine);
            }
        };
        serverChannelBuilder.configure(args.sslServerConfigs);
        this.server = new NioEchoServer(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.SSL), SecurityProtocol.SSL, new TestSecurityConfig(args.sslServerConfigs), "localhost", (ChannelBuilder)serverChannelBuilder, null, time);
        this.server.start();
        this.createSelector(args.sslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testInvalidEndpointIdentification(Args args) throws Exception {
        args.serverCertStores = SslTransportLayerTest.certBuilder(true, "server", args.useInlinePem).addHostName("notahost").build();
        args.clientCertStores = SslTransportLayerTest.certBuilder(false, "client", args.useInlinePem).addHostName("localhost").build();
        args.sslServerConfigs = args.getTrustingConfig(args.serverCertStores, args.clientCertStores);
        args.sslClientConfigs = args.getTrustingConfig(args.clientCertStores, args.serverCertStores);
        args.sslClientConfigs.put("ssl.endpoint.identification.algorithm", "HTTPS");
        this.verifySslConfigsWithHandshakeFailure(args);
        this.verifyAuditLogEvent(args, false);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testEndpointIdentificationDisabled(Args args) throws Exception {
        args.serverCertStores = SslTransportLayerTest.certBuilder(true, "server", args.useInlinePem).addHostName("notahost").build();
        args.clientCertStores = SslTransportLayerTest.certBuilder(false, "client", args.useInlinePem).addHostName("localhost").build();
        args.sslServerConfigs = args.getTrustingConfig(args.serverCertStores, args.clientCertStores);
        args.sslClientConfigs = args.getTrustingConfig(args.clientCertStores, args.serverCertStores);
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        String node = "1";
        args.sslClientConfigs.put("ssl.endpoint.identification.algorithm", "");
        this.createSelector(args.sslClientConfigs);
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
        String node2 = "2";
        args.sslClientConfigs.put("ssl.endpoint.identification.algorithm", null);
        this.createSelector(args.sslClientConfigs);
        this.selector.connect(node2, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node2, 100, 10);
        String node3 = "3";
        args.sslClientConfigs.put("ssl.endpoint.identification.algorithm", "HTTPS");
        this.createSelector(args.sslClientConfigs);
        this.selector.connect(node3, addr, 4096, 4096);
        NetworkTestUtils.waitForChannelClose(this.selector, node3, ChannelState.State.AUTHENTICATION_FAILED);
        this.selector.close();
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testClientAuthenticationRequiredValidProvided(Args args) throws Exception {
        args.sslServerConfigs.put("ssl.client.auth", "required");
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testListenerConfigOverride(Args args) throws Exception {
        String node = "0";
        ListenerName clientListenerName = new ListenerName("client");
        args.sslServerConfigs.put("ssl.client.auth", "required");
        args.sslServerConfigs.put(clientListenerName.configPrefix() + "ssl.client.auth", "none");
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.createSelector(args.sslClientConfigs);
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
        this.selector.close();
        CertStores.KEYSTORE_PROPS.forEach(args.sslClientConfigs::remove);
        this.createSelector(args.sslClientConfigs);
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.waitForChannelClose(this.selector, node, ChannelState.State.AUTHENTICATION_FAILED);
        this.selector.close();
        this.server.close();
        this.server = this.createEchoServer(args, clientListenerName, SecurityProtocol.SSL);
        addr = new InetSocketAddress("localhost", this.server.port());
        this.createSelector(args.sslClientConfigs);
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testClientAuthenticationRequiredUntrustedProvided(Args args) throws Exception {
        args.sslServerConfigs = args.serverCertStores.getUntrustingConfig();
        args.sslServerConfigs.putAll(args.sslConfigOverrides);
        args.sslServerConfigs.put("ssl.client.auth", "required");
        this.verifySslConfigsWithHandshakeFailure(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testClientAuthenticationRequiredNotProvided(Args args) throws Exception {
        args.sslServerConfigs.put("ssl.client.auth", "required");
        CertStores.KEYSTORE_PROPS.forEach(args.sslClientConfigs::remove);
        this.verifySslConfigsWithHandshakeFailure(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testClientAuthenticationDisabledUntrustedProvided(Args args) throws Exception {
        args.sslServerConfigs = args.serverCertStores.getUntrustingConfig();
        args.sslServerConfigs.putAll(args.sslConfigOverrides);
        args.sslServerConfigs.put("ssl.client.auth", "none");
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testClientAuthenticationDisabledNotProvided(Args args) throws Exception {
        args.sslServerConfigs.put("ssl.client.auth", "none");
        CertStores.KEYSTORE_PROPS.forEach(args.sslClientConfigs::remove);
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testClientAuthenticationRequestedValidProvided(Args args) throws Exception {
        args.sslServerConfigs.put("ssl.client.auth", "requested");
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testClientAuthenticationRequestedNotProvided(Args args) throws Exception {
        args.sslServerConfigs.put("ssl.client.auth", "requested");
        CertStores.KEYSTORE_PROPS.forEach(args.sslClientConfigs::remove);
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testDsaKeyPair(Args args) throws Exception {
        Assumptions.assumeTrue((boolean)args.tlsProtocol.equals("TLSv1.2"));
        args.serverCertStores = SslTransportLayerTest.certBuilder(true, "server", args.useInlinePem).keyAlgorithm("DSA").build();
        args.clientCertStores = SslTransportLayerTest.certBuilder(false, "client", args.useInlinePem).keyAlgorithm("DSA").build();
        args.sslServerConfigs = args.getTrustingConfig(args.serverCertStores, args.clientCertStores);
        args.sslClientConfigs = args.getTrustingConfig(args.clientCertStores, args.serverCertStores);
        args.sslServerConfigs.put("ssl.client.auth", "required");
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testECKeyPair(Args args) throws Exception {
        args.serverCertStores = SslTransportLayerTest.certBuilder(true, "server", args.useInlinePem).keyAlgorithm("EC").build();
        args.clientCertStores = SslTransportLayerTest.certBuilder(false, "client", args.useInlinePem).keyAlgorithm("EC").build();
        args.sslServerConfigs = args.getTrustingConfig(args.serverCertStores, args.clientCertStores);
        args.sslClientConfigs = args.getTrustingConfig(args.clientCertStores, args.serverCertStores);
        args.sslServerConfigs.put("ssl.client.auth", "required");
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testPemFiles(Args args) throws Exception {
        TestSslUtils.convertToPem(args.sslServerConfigs, true, true);
        TestSslUtils.convertToPem(args.sslClientConfigs, true, true);
        args.sslServerConfigs.put("ssl.client.auth", "required");
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testPemFilesWithoutClientKeyPassword(Args args) throws Exception {
        boolean useInlinePem = args.useInlinePem;
        TestSslUtils.convertToPem(args.sslServerConfigs, !useInlinePem, true);
        TestSslUtils.convertToPem(args.sslClientConfigs, !useInlinePem, false);
        args.sslServerConfigs.put("ssl.client.auth", "required");
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testPemFilesWithoutServerKeyPassword(Args args) throws Exception {
        TestSslUtils.convertToPem(args.sslServerConfigs, !args.useInlinePem, false);
        TestSslUtils.convertToPem(args.sslClientConfigs, !args.useInlinePem, true);
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testInvalidSecureRandomImplementation(Args args) {
        try (SslChannelBuilder channelBuilder = this.newClientChannelBuilder();){
            args.sslClientConfigs.put("ssl.secure.random.implementation", "invalid");
            Assertions.assertThrows(KafkaException.class, () -> channelBuilder.configure(args.sslClientConfigs));
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testInvalidTruststorePassword(Args args) {
        try (SslChannelBuilder channelBuilder = this.newClientChannelBuilder();){
            args.sslClientConfigs.put("ssl.truststore.password", "invalid");
            Assertions.assertThrows(KafkaException.class, () -> channelBuilder.configure(args.sslClientConfigs));
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testInvalidKeystorePassword(Args args) {
        try (SslChannelBuilder channelBuilder = this.newClientChannelBuilder();){
            args.sslClientConfigs.put("ssl.keystore.password", "invalid");
            Assertions.assertThrows(KafkaException.class, () -> channelBuilder.configure(args.sslClientConfigs));
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testNullTruststorePassword(Args args) throws Exception {
        args.sslClientConfigs.remove("ssl.truststore.password");
        args.sslServerConfigs.remove("ssl.truststore.password");
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testInvalidKeyPassword(Args args) throws Exception {
        args.sslServerConfigs.put("ssl.key.password", new Password("invalid"));
        if (args.useInlinePem || args.engineFactoryClass == NettySslEngineFactory.class) {
            Assertions.assertThrows(InvalidConfigurationException.class, () -> this.createEchoServer(args, SecurityProtocol.SSL));
            return;
        }
        this.verifySslConfigsWithHandshakeFailure(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testTlsDefaults(Args args) throws Exception {
        args.sslServerConfigs = args.serverCertStores.getTrustingConfig(args.clientCertStores);
        args.sslClientConfigs = args.clientCertStores.getTrustingConfig(args.serverCertStores);
        Assertions.assertEquals((Object)SslConfigs.DEFAULT_SSL_PROTOCOL, args.sslServerConfigs.get("ssl.protocol"));
        Assertions.assertEquals((Object)SslConfigs.DEFAULT_SSL_PROTOCOL, args.sslClientConfigs.get("ssl.protocol"));
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.createSelector(args.sslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect("0", addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, "0", 10, 100);
        this.server.verifyAuthenticationMetrics(1, 0);
        this.selector.close();
    }

    private void checkAuthenticationFailed(Args args, String node, String tlsVersion) throws IOException {
        args.sslClientConfigs.put("ssl.enabled.protocols", Arrays.asList(tlsVersion));
        this.createSelector(args.sslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.waitForChannelClose(this.selector, node, ChannelState.State.AUTHENTICATION_FAILED);
        this.selector.close();
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testUnsupportedCiphers(Args args) throws Exception {
        Assumptions.assumeTrue((boolean)args.supportsCipherConfiguration());
        String[] cipherSuites = args.supportedCipherSuites();
        args.sslServerConfigs.put("ssl.cipher.suites", Arrays.asList(cipherSuites[0]));
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        args.sslClientConfigs.put("ssl.cipher.suites", Arrays.asList(cipherSuites[1]));
        this.createSelector(args.sslClientConfigs);
        this.checkAuthenticationFailed(args, "1", args.tlsProtocol);
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testServerRequestMetrics(Args args) throws Exception {
        String node = "0";
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.createSelector(args.sslClientConfigs, 16384, 16384, 16384);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 102400, 102400);
        NetworkTestUtils.waitForChannelReady(this.selector, node);
        int messageSize = 0x100000;
        String message = TestUtils.randomString(messageSize);
        this.selector.send(new NetworkSend(node, (Send)ByteBufferSend.sizePrefixed((ByteBuffer)ByteBuffer.wrap(message.getBytes()))));
        while (this.selector.completedReceives().isEmpty()) {
            this.selector.poll(100L);
        }
        int totalBytes = messageSize + 4;
        this.server.waitForMetric("incoming-byte", totalBytes);
        this.server.waitForMetric("outgoing-byte", totalBytes);
        this.server.waitForMetric("request", 1.0);
        this.server.waitForMetric("response", 1.0);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testSelectorPollReadSize(Args args) throws Exception {
        String node = "0";
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.createSelector(args.sslClientConfigs, 16384, 16384, 16384);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 102400, 102400);
        NetworkTestUtils.checkClientConnection(this.selector, node, 81920, 1);
        String message = TestUtils.randomString(81920);
        this.selector.send(new NetworkSend(node, (Send)ByteBufferSend.sizePrefixed((ByteBuffer)ByteBuffer.wrap(message.getBytes()))));
        TestUtils.waitForCondition(() -> {
            try {
                this.selector.poll(100L);
            }
            catch (IOException e) {
                return false;
            }
            return this.selector.completedSends().size() > 0;
        }, "Timed out waiting for message to be sent");
        TestUtils.waitForCondition(() -> this.server.numSent() >= 2, "Timed out waiting for echo server to send message");
        this.selector.poll(1000L);
        Collection receiveList = this.selector.completedReceives();
        Assertions.assertEquals((int)1, (int)receiveList.size());
        Assertions.assertEquals((Object)message, (Object)new String(Utils.toArray((ByteBuffer)((NetworkReceive)receiveList.iterator().next()).payload())));
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testNetReadBufferResize(Args args) throws Exception {
        String node = "0";
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.createSelector(args.sslClientConfigs, 10, null, null);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 64000, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testNetWriteBufferResize(Args args) throws Exception {
        String node = "0";
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.createSelector(args.sslClientConfigs, null, 10, null);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 64000, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testApplicationBufferResize(Args args) throws Exception {
        String node = "0";
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.createSelector(args.sslClientConfigs, null, null, 10);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 64000, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testNetworkThreadTimeRecorded(Args args) throws Exception {
        LogContext logContext = new LogContext();
        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false, logContext, new ProxyProtocolEngineFactory(ProxyProtocol.NONE));
        channelBuilder.configure(args.sslClientConfigs);
        try (Selector selector = new Selector(-1, -1L, new Metrics(), Time.SYSTEM, "MetricGroup", new HashMap(), false, true, (ChannelBuilder)channelBuilder, MemoryPool.NONE, logContext);){
            String node = "0";
            this.server = this.createEchoServer(args, SecurityProtocol.SSL);
            InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
            selector.connect(node, addr, 4096, 4096);
            String message = TestUtils.randomString(0x100000);
            NetworkTestUtils.waitForChannelReady(selector, node);
            KafkaChannel channel = selector.channel(node);
            Assertions.assertTrue((channel.metrics().networkIoTimeNanos() > 0L ? 1 : 0) != 0, (String)"SSL handshake time not recorded");
            Assertions.assertEquals((long)0L, (long)channel.metrics().writeIoTimeNanos());
            channel.resetNetworkIoTimes();
            Assertions.assertEquals((long)0L, (long)channel.metrics().networkIoTimeNanos(), (String)"Time not reset");
            selector.mute(node);
            selector.send(new NetworkSend(node, (Send)ByteBufferSend.sizePrefixed((ByteBuffer)ByteBuffer.wrap(message.getBytes()))));
            while (selector.completedSends().isEmpty()) {
                selector.poll(100L);
            }
            long sendTimeNanos = channel.metrics().networkIoTimeNanos();
            Assertions.assertTrue((sendTimeNanos > 0L ? 1 : 0) != 0, (String)("Send time not recorded: " + sendTimeNanos));
            Assertions.assertEquals((long)sendTimeNanos, (long)channel.metrics().writeIoTimeNanos());
            channel.resetNetworkIoTimes();
            Assertions.assertEquals((long)0L, (long)channel.metrics().networkIoTimeNanos(), (String)"Time not reset");
            Assertions.assertEquals((long)0L, (long)channel.metrics().writeIoTimeNanos(), (String)"Write time not reset");
            Assertions.assertFalse((boolean)channel.hasBytesBuffered(), (String)"Unexpected bytes buffered");
            Assertions.assertEquals((int)0, (int)selector.completedReceives().size());
            selector.unmute(node);
            TestUtils.waitForCondition(() -> {
                try {
                    selector.poll(100L);
                }
                catch (IOException e) {
                    return false;
                }
                return !selector.completedReceives().isEmpty();
            }, "Timed out waiting for a message to receive from echo server");
            long receiveTimeNanos = channel.metrics().networkIoTimeNanos();
            Assertions.assertTrue((receiveTimeNanos > 0L ? 1 : 0) != 0, (String)("Receive time not recorded: " + receiveTimeNanos));
            Assertions.assertEquals((long)0L, (long)channel.metrics().writeIoTimeNanos());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testIOExceptionsDuringHandshakeRead(Args args) throws Exception {
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.testIOExceptionsDuringHandshake(args, FailureAction.THROW_IO_EXCEPTION, FailureAction.NO_OP);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testIOExceptionsDuringHandshakeWrite(Args args) throws Exception {
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.testIOExceptionsDuringHandshake(args, FailureAction.NO_OP, FailureAction.THROW_IO_EXCEPTION);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testUngracefulRemoteCloseDuringHandshakeRead(Args args) throws Exception {
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.testIOExceptionsDuringHandshake(args, this.server::closeSocketChannels, FailureAction.NO_OP);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testUngracefulRemoteCloseDuringHandshakeWrite(Args args) throws Exception {
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.testIOExceptionsDuringHandshake(args, FailureAction.NO_OP, this.server::closeSocketChannels);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testGracefulRemoteCloseDuringHandshakeRead(Args args) throws Exception {
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.testIOExceptionsDuringHandshake(args, FailureAction.NO_OP, this.server::closeKafkaChannels);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testGracefulRemoteCloseDuringHandshakeWrite(Args args) throws Exception {
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.testIOExceptionsDuringHandshake(args, this.server::closeKafkaChannels, FailureAction.NO_OP);
    }

    private void testIOExceptionsDuringHandshake(Args args, FailureAction readFailureAction, FailureAction flushFailureAction) throws Exception {
        TestSslChannelBuilder channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
        boolean done = false;
        for (int i = 1; i <= 100; ++i) {
            KafkaChannel channel;
            String node = String.valueOf(i);
            channelBuilder.readFailureAction = readFailureAction;
            channelBuilder.flushFailureAction = flushFailureAction;
            channelBuilder.failureIndex = i;
            channelBuilder.configure(args.sslClientConfigs);
            this.selector = new Selector(5000L, new Metrics(), time, "MetricGroup", (ChannelBuilder)channelBuilder, new LogContext());
            InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
            this.selector.connect(node, addr, 4096, 4096);
            for (int j = 0; j < 30; ++j) {
                this.selector.poll(1000L);
                KafkaChannel channel2 = this.selector.channel(node);
                if (channel2 != null && channel2.ready()) {
                    done = true;
                    break;
                }
                if (!this.selector.disconnected().containsKey(node)) continue;
                ChannelState.State state = ((ChannelState)this.selector.disconnected().get(node)).state();
                Assertions.assertTrue((state == ChannelState.State.AUTHENTICATE || state == ChannelState.State.READY ? 1 : 0) != 0, (String)("Unexpected channel state " + state));
                break;
            }
            if ((channel = this.selector.channel(node)) != null) {
                Assertions.assertTrue((boolean)channel.ready(), (String)("Channel not ready or disconnected:" + channel.state().state()));
            }
            this.selector.close();
        }
        Assertions.assertTrue((boolean)done, (String)"Too many invocations of read/write during SslTransportLayer.handshake()");
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testPeerNotifiedOfHandshakeFailure(Args args) throws Exception {
        args.sslServerConfigs = args.serverCertStores.getUntrustingConfig();
        args.sslServerConfigs.putAll(args.sslConfigOverrides);
        args.sslServerConfigs.put("ssl.client.auth", "required");
        int i = 0;
        while (i < 3) {
            String node = String.valueOf(i);
            TestSslChannelBuilder serverChannelBuilder = new TestSslChannelBuilder(Mode.SERVER);
            serverChannelBuilder.configure(args.sslServerConfigs);
            serverChannelBuilder.flushDelayCount = i++;
            this.server = new NioEchoServer(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.SSL), SecurityProtocol.SSL, new TestSecurityConfig(args.sslServerConfigs), "localhost", (ChannelBuilder)serverChannelBuilder, null, time);
            this.server.start();
            this.createSelector(args.sslClientConfigs);
            InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
            this.selector.connect(node, addr, 4096, 4096);
            NetworkTestUtils.waitForChannelClose(this.selector, node, ChannelState.State.AUTHENTICATION_FAILED);
            this.server.close();
            this.selector.close();
            serverChannelBuilder.close();
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testPeerNotifiedOfHandshakeFailureWithClientSideDelay(Args args) throws Exception {
        args.sslServerConfigs.put("ssl.client.auth", "required");
        CertStores.KEYSTORE_PROPS.forEach(args.sslClientConfigs::remove);
        this.verifySslConfigsWithHandshakeFailure(args, 1);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testCloseSsl(Args args) throws Exception {
        this.testClose(args, SecurityProtocol.SSL, (ChannelBuilder)this.newClientChannelBuilder());
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testClosePlaintext(Args args) throws Exception {
        this.testClose(args, SecurityProtocol.PLAINTEXT, (ChannelBuilder)new PlaintextChannelBuilder(Mode.CLIENT, null, new ProxyProtocolEngineFactory(ProxyProtocol.NONE)));
    }

    private SslChannelBuilder newClientChannelBuilder() {
        return new SslChannelBuilder(Mode.CLIENT, null, false, new LogContext(), new ProxyProtocolEngineFactory(ProxyProtocol.NONE));
    }

    private void testClose(Args args, SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception {
        String node = "0";
        this.server = this.createEchoServer(args, securityProtocol);
        clientChannelBuilder.configure(args.sslClientConfigs);
        this.selector = new Selector(5000L, new Metrics(), time, "MetricGroup", clientChannelBuilder, new LogContext());
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.waitForChannelReady(this.selector, node);
        TestUtils.waitForCondition(() -> this.server.selector().channels().stream().allMatch(KafkaChannel::ready), "Channel not ready");
        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
        this.server.outputChannel(Channels.newChannel(bytesOut));
        this.server.selector().muteAll();
        byte[] message = TestUtils.randomString(100).getBytes();
        int count = 20;
        int totalSendSize = count * (message.length + 4);
        for (int i = 0; i < count; ++i) {
            this.selector.send(new NetworkSend(node, (Send)ByteBufferSend.sizePrefixed((ByteBuffer)ByteBuffer.wrap(message))));
            do {
                this.selector.poll(0L);
            } while (this.selector.completedSends().isEmpty());
        }
        this.server.selector().unmuteAll();
        this.selector.close(node);
        TestUtils.waitForCondition(() -> bytesOut.toByteArray().length == totalSendSize, 5000L, "All requests sent were not processed");
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testInterBrokerSslConfigValidation(Args args) throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        args.sslServerConfigs.put("ssl.client.auth", "required");
        args.sslServerConfigs.put("ssl.endpoint.identification.algorithm", "HTTPS");
        args.sslServerConfigs.putAll(args.serverCertStores.keyStoreProps());
        args.sslServerConfigs.putAll(args.serverCertStores.trustStoreProps());
        args.sslClientConfigs.putAll(args.serverCertStores.keyStoreProps());
        args.sslClientConfigs.putAll(args.serverCertStores.trustStoreProps());
        TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)true, (SecurityProtocol)securityProtocol, (AbstractConfig)config, null, null, (Time)time, (LogContext)new LogContext(), this.defaultApiVersionsSupplier(), (RequestCallback)new DefaultRequestCallbackManager());
        this.server = new NioEchoServer(listenerName, securityProtocol, config, "localhost", serverChannelBuilder, null, time);
        this.server.start();
        this.selector = this.createSelector(args.sslClientConfigs, null, null, null);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        this.selector.connect("0", addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, "0", 100, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testInterBrokerSslConfigValidationFailure(Args args) {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        args.sslServerConfigs.put("ssl.client.auth", "required");
        TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        Assertions.assertThrows(KafkaException.class, () -> ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)true, (SecurityProtocol)securityProtocol, (AbstractConfig)config, null, null, (Time)time, (LogContext)new LogContext(), this.defaultApiVersionsSupplier(), (RequestCallback)new DefaultRequestCallbackManager()));
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testServerKeystoreDynamicUpdate(Args args) throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)false, (SecurityProtocol)securityProtocol, (AbstractConfig)config, null, null, (Time)time, (LogContext)new LogContext(), this.defaultApiVersionsSupplier(), (RequestCallback)new DefaultRequestCallbackManager());
        this.server = new NioEchoServer(listenerName, securityProtocol, config, "localhost", serverChannelBuilder, null, time);
        this.server.start();
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        String oldNode = "0";
        Selector oldClientSelector = this.createSelector(args.sslClientConfigs);
        oldClientSelector.connect(oldNode, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, oldNode, 100, 10);
        CertStores newServerCertStores = SslTransportLayerTest.certBuilder(true, "server", args.useInlinePem).addHostName("localhost").build();
        Map<String, Object> newKeystoreConfigs = newServerCertStores.keyStoreProps();
        Assertions.assertTrue((boolean)(serverChannelBuilder instanceof ListenerReconfigurable), (String)"SslChannelBuilder not reconfigurable");
        ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable)serverChannelBuilder;
        Assertions.assertEquals((Object)listenerName, (Object)reconfigurableBuilder.listenerName());
        reconfigurableBuilder.validateReconfiguration(newKeystoreConfigs);
        reconfigurableBuilder.reconfigure(newKeystoreConfigs);
        oldClientSelector.connect("1", addr, 4096, 4096);
        NetworkTestUtils.waitForChannelClose(oldClientSelector, "1", ChannelState.State.AUTHENTICATION_FAILED);
        args.sslClientConfigs = args.getTrustingConfig(args.clientCertStores, newServerCertStores);
        Selector newClientSelector = this.createSelector(args.sslClientConfigs);
        newClientSelector.connect("2", addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(newClientSelector, "2", 100, 10);
        NetworkTestUtils.checkClientConnection(oldClientSelector, oldNode, 100, 10);
        CertStores invalidCertStores = SslTransportLayerTest.certBuilder(true, "server", args.useInlinePem).addHostName("127.0.0.1").build();
        Map<String, Object> invalidConfigs = args.getTrustingConfig(invalidCertStores, args.clientCertStores);
        this.verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, "keystore with different SubjectAltName");
        HashMap<String, Object> missingStoreConfigs = new HashMap<String, Object>();
        missingStoreConfigs.put("ssl.keystore.type", "PKCS12");
        missingStoreConfigs.put("ssl.keystore.location", "some.keystore.path");
        missingStoreConfigs.put("ssl.keystore.password", new Password("some.keystore.password"));
        missingStoreConfigs.put("ssl.key.password", new Password("some.key.password"));
        this.verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "keystore not found");
        newClientSelector.connect("3", addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testServerKeystoreDynamicUpdateWithNewSubjectAltName(Args args) throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)false, (SecurityProtocol)securityProtocol, (AbstractConfig)config, null, null, (Time)time, (LogContext)new LogContext(), this.defaultApiVersionsSupplier(), (RequestCallback)new DefaultRequestCallbackManager());
        this.server = new NioEchoServer(listenerName, securityProtocol, config, "localhost", serverChannelBuilder, null, time);
        this.server.start();
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        Selector selector = this.createSelector(args.sslClientConfigs);
        String node1 = "1";
        selector.connect(node1, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(selector, node1, 100, 10);
        selector.close();
        TestSslUtils.CertificateBuilder certBuilder = new TestSslUtils.CertificateBuilder().sanDnsNames("localhost", "*.example.com");
        String truststorePath = (String)args.sslClientConfigs.get("ssl.truststore.location");
        File truststoreFile = truststorePath != null ? new File(truststorePath) : null;
        TestSslUtils.SslConfigsBuilder builder = new TestSslUtils.SslConfigsBuilder(Mode.SERVER).useClientCert(false).certAlias("server").cn("server").certBuilder(certBuilder).createNewTrustStore(truststoreFile).usePem(args.useInlinePem);
        Map<String, Object> newConfigs = builder.build();
        HashMap<String, Object> newKeystoreConfigs = new HashMap<String, Object>();
        for (String string : CertStores.KEYSTORE_PROPS) {
            newKeystoreConfigs.put(string, newConfigs.get(string));
        }
        ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable)serverChannelBuilder;
        reconfigurableBuilder.validateReconfiguration(newKeystoreConfigs);
        reconfigurableBuilder.reconfigure(newKeystoreConfigs);
        for (String propName : CertStores.TRUSTSTORE_PROPS) {
            args.sslClientConfigs.put(propName, newConfigs.get(propName));
        }
        selector = this.createSelector(args.sslClientConfigs);
        String string = "2";
        selector.connect(string, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(selector, string, 100, 10);
        TestSslUtils.CertificateBuilder invalidBuilder = new TestSslUtils.CertificateBuilder().sanDnsNames("localhost");
        if (!args.useInlinePem) {
            builder.useExistingTrustStore(truststoreFile);
        }
        Map<String, Object> invalidConfig = builder.certBuilder(invalidBuilder).build();
        HashMap<String, Object> invalidKeystoreConfigs = new HashMap<String, Object>();
        for (String propName : CertStores.KEYSTORE_PROPS) {
            invalidKeystoreConfigs.put(propName, invalidConfig.get(propName));
        }
        this.verifyInvalidReconfigure(reconfigurableBuilder, invalidKeystoreConfigs, "keystore without existing SubjectAltName");
        String node3 = "3";
        selector.connect(node3, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(selector, node3, 100, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testServerTruststoreDynamicUpdate(Args args) throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        args.sslServerConfigs.put("ssl.client.auth", "required");
        TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)false, (SecurityProtocol)securityProtocol, (AbstractConfig)config, null, null, (Time)time, (LogContext)new LogContext(), this.defaultApiVersionsSupplier(), (RequestCallback)new DefaultRequestCallbackManager());
        this.server = new NioEchoServer(listenerName, securityProtocol, config, "localhost", serverChannelBuilder, null, time);
        this.server.start();
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        String oldNode = "0";
        Selector oldClientSelector = this.createSelector(args.sslClientConfigs);
        oldClientSelector.connect(oldNode, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, oldNode, 100, 10);
        CertStores newClientCertStores = SslTransportLayerTest.certBuilder(true, "client", args.useInlinePem).addHostName("localhost").build();
        args.sslClientConfigs = args.getTrustingConfig(newClientCertStores, args.serverCertStores);
        Map<String, Object> newTruststoreConfigs = newClientCertStores.trustStoreProps();
        Assertions.assertTrue((boolean)(serverChannelBuilder instanceof ListenerReconfigurable), (String)"SslChannelBuilder not reconfigurable");
        ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable)serverChannelBuilder;
        Assertions.assertEquals((Object)listenerName, (Object)reconfigurableBuilder.listenerName());
        reconfigurableBuilder.validateReconfiguration(newTruststoreConfigs);
        reconfigurableBuilder.reconfigure(newTruststoreConfigs);
        oldClientSelector.connect("1", addr, 4096, 4096);
        NetworkTestUtils.waitForChannelClose(oldClientSelector, "1", ChannelState.State.AUTHENTICATION_FAILED);
        Selector newClientSelector = this.createSelector(args.sslClientConfigs);
        newClientSelector.connect("2", addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(newClientSelector, "2", 100, 10);
        NetworkTestUtils.checkClientConnection(oldClientSelector, oldNode, 100, 10);
        HashMap<String, Object> invalidConfigs = new HashMap<String, Object>(newTruststoreConfigs);
        invalidConfigs.put("ssl.truststore.type", "INVALID_TYPE");
        this.verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, "invalid truststore type");
        HashMap<String, Object> missingStoreConfigs = new HashMap<String, Object>();
        missingStoreConfigs.put("ssl.truststore.type", "PKCS12");
        missingStoreConfigs.put("ssl.truststore.location", "some.truststore.path");
        missingStoreConfigs.put("ssl.truststore.password", new Password("some.truststore.password"));
        this.verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "truststore not found");
        newClientSelector.connect("3", addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testServerCipherDynamicUpdate(Args args) throws Exception {
        Assumptions.assumeTrue((boolean)args.supportsCipherConfiguration());
        String[] cipherSuites = args.supportedCipherSuites();
        String serverCipher = cipherSuites[0];
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        args.sslServerConfigs.put("ssl.client.auth", "required");
        args.sslServerConfigs.put("ssl.cipher.suites", Collections.singletonList(serverCipher));
        TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)false, (SecurityProtocol)securityProtocol, (AbstractConfig)config, null, null, (Time)time, (LogContext)new LogContext(), this.defaultApiVersionsSupplier(), (RequestCallback)new DefaultRequestCallbackManager());
        this.server = new NioEchoServer(listenerName, securityProtocol, config, "localhost", serverChannelBuilder, null, time);
        this.server.start();
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        String idForOldServer = "0";
        Selector initialClient = this.createSelector(args);
        initialClient.connect(idForOldServer, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(initialClient, idForOldServer, 100, 10);
        String newCipherName = cipherSuites[1];
        Map clientConfigsWithNewCipher = args.sslClientConfigs;
        clientConfigsWithNewCipher.put("ssl.cipher.suites", Collections.singletonList(newCipherName));
        Selector newClient = this.createSelector(clientConfigsWithNewCipher);
        newClient.connect(idForOldServer, addr, 4096, 4096);
        NetworkTestUtils.waitForChannelClose(newClient, idForOldServer, ChannelState.State.AUTHENTICATION_FAILED);
        Map serverConfigsWithNewCipher = args.sslServerConfigs;
        serverConfigsWithNewCipher.put("ssl.cipher.suites", Collections.singletonList(newCipherName));
        ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable)serverChannelBuilder;
        Assertions.assertEquals((Object)listenerName, (Object)reconfigurableBuilder.listenerName());
        reconfigurableBuilder.validateReconfiguration(serverConfigsWithNewCipher);
        reconfigurableBuilder.reconfigure(serverConfigsWithNewCipher);
        String idForNewServer = "1";
        newClient.connect(idForNewServer, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(newClient, idForNewServer, 100, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testEngineFactoryDynamicUpdate(Args args) throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        args.sslServerConfigs.put("ssl.client.auth", "required");
        args.sslServerConfigs.put("ssl.engine.factory.class", NettySslEngineFactory.class);
        TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)false, (SecurityProtocol)securityProtocol, (AbstractConfig)config, null, null, (Time)time, (LogContext)new LogContext(), this.defaultApiVersionsSupplier(), (RequestCallback)new DefaultRequestCallbackManager());
        Assertions.assertTrue((boolean)(((SslChannelBuilder)serverChannelBuilder).sslEngineFactory() instanceof NettySslEngineFactory));
        this.server = new NioEchoServer(listenerName, securityProtocol, config, "localhost", serverChannelBuilder, null, time);
        this.server.start();
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        String idForOldServer = "0";
        Selector initialClient = this.createSelector(args);
        initialClient.connect(idForOldServer, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(initialClient, idForOldServer, 100, 10);
        Map serverConfigsWithNewEngineFactory = args.sslServerConfigs;
        String listenerPrefixedEngineFactoryClassConfig = "listener.name." + listenerName + "." + "ssl.engine.factory.class";
        serverConfigsWithNewEngineFactory.put(listenerPrefixedEngineFactoryClassConfig, DefaultSslEngineFactory.class);
        ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable)serverChannelBuilder;
        Assertions.assertEquals((Object)listenerName, (Object)reconfigurableBuilder.listenerName());
        reconfigurableBuilder.validateReconfiguration(serverConfigsWithNewEngineFactory);
        reconfigurableBuilder.reconfigure(serverConfigsWithNewEngineFactory);
        Assertions.assertTrue((boolean)(((SslChannelBuilder)serverChannelBuilder).sslEngineFactory() instanceof DefaultSslEngineFactory));
        String idForNewServer = "1";
        initialClient.connect(idForNewServer, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(initialClient, idForNewServer, 100, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testCustomClientSslEngineFactory(Args args) throws Exception {
        args.sslClientConfigs.put("ssl.engine.factory.class", TestSslUtils.TestSslEngineFactory.class);
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testCustomServerSslEngineFactory(Args args) throws Exception {
        args.sslServerConfigs.put("ssl.engine.factory.class", TestSslUtils.TestSslEngineFactory.class);
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testCustomClientAndServerSslEngineFactory(Args args) throws Exception {
        args.sslClientConfigs.put("ssl.engine.factory.class", TestSslUtils.TestSslEngineFactory.class);
        args.sslServerConfigs.put("ssl.engine.factory.class", TestSslUtils.TestSslEngineFactory.class);
        this.verifySslConfigs(args);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testInvalidSslEngineFactory(Args args) {
        args.sslClientConfigs.put("ssl.engine.factory.class", String.class);
        Assertions.assertThrows(KafkaException.class, () -> this.createSelector(args.sslClientConfigs));
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testLogMessageForSslHandshakeException(Args args) throws Exception {
        String node = "0";
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        args.sslClientConfigs.put("ssl.endpoint.identification.algorithm", "HTTPS");
        this.createSelector(args);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
        String errorMessage = "SSL handshake failed";
        String logMessage = "errorMessage=SSL handshake failed";
        NetworkTestUtils.waitTillServerConnectionWithAuthFailure(this.server.selector(), this.selector, errorMessage, logMessage, true);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testWithProxyProtocolEngine(Args args) throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        args.sslServerConfigs.put("ssl.client.auth", "required");
        TestSecurityConfig config = new TestSecurityConfig(args.sslServerConfigs);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        InetAddress clientAddress = InetAddress.getLocalHost();
        ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)false, (SecurityProtocol)securityProtocol, (AbstractConfig)config, null, null, (Time)time, (LogContext)new LogContext(), this.defaultApiVersionsSupplier(), (RequestCallback)new DefaultRequestCallbackManager(), (ProxyProtocolEngineFactory)new ProxyProtocolEngineFactory(() -> new TestProxyProtocolEngine(clientAddress, 31313)));
        this.server = new NioEchoServer(listenerName, securityProtocol, config, "localhost", serverChannelBuilder, null, time);
        this.server.start();
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        String serverId = "0";
        Selector client = this.createSelector(args);
        client.connect(serverId, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(client, serverId, 100, 10);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SslTransportLayerArgumentsProvider.class)
    public void testProxyEngineConsumesFullSocketRead(Args args) throws Exception {
        args.sslServerConfigs.put("ssl.client.auth", "required");
        TestSslChannelBuilder serverChannelBuilder = new TestSslChannelBuilder(Mode.SERVER){

            @Override
            protected TestSslChannelBuilder.TestSslTransportLayer newTransportLayer(String id, SelectionKey key, SSLEngine sslEngine, Mode mode, ProxyProtocolEngine proxyProtocolEngine) {
                return new TestSslChannelBuilder.TestSslTransportLayer(id, key, sslEngine, mode, proxyProtocolEngine){

                    protected int maybeReadAndProcessProxyHeaders() throws IOException {
                        this.proxyProtocolEngine.processHeaders(this.netReadBuffer);
                        return 60;
                    }
                };
            }
        };
        serverChannelBuilder.configure(new TestSecurityConfig(args.sslServerConfigs).values());
        this.server = new NioEchoServer(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.SSL), SecurityProtocol.SSL, (AbstractConfig)new TestSecurityConfig(args.sslServerConfigs), "localhost", (ChannelBuilder)serverChannelBuilder, null, 100, time);
        this.server.start();
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        String serverId = "0";
        Selector client = this.createSelector(args.sslClientConfigs);
        client.connect(serverId, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, serverId, 100, 10);
    }

    private void verifyInvalidReconfigure(ListenerReconfigurable reconfigurable, Map<String, Object> invalidConfigs, String errorMessage) {
        Assertions.assertThrows(KafkaException.class, () -> reconfigurable.validateReconfiguration(invalidConfigs));
        Assertions.assertThrows(KafkaException.class, () -> reconfigurable.reconfigure(invalidConfigs));
    }

    private Selector createSelector(Map<String, Object> sslClientConfigs) {
        return this.createSelector(sslClientConfigs, null, null, null);
    }

    private Selector createSelector(Map<String, Object> sslClientConfigs, Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) {
        TestSslChannelBuilder channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
        channelBuilder.configureBufferSizes(netReadBufSize, netWriteBufSize, appBufSize);
        channelBuilder.configure(sslClientConfigs);
        this.selector = new Selector(500000L, new Metrics(), time, "MetricGroup", (ChannelBuilder)channelBuilder, new LogContext());
        return this.selector;
    }

    private NioEchoServer createEchoServer(Args args, ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, (AbstractConfig)new TestSecurityConfig(args.sslServerConfigs), null, time, Optional.of(args.auditLogProvider));
    }

    private NioEchoServer createEchoServer(Args args, SecurityProtocol securityProtocol) throws Exception {
        return this.createEchoServer(args, ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol), securityProtocol);
    }

    private Selector createSelector(Args args) {
        LogContext logContext = new LogContext();
        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false, logContext, new ProxyProtocolEngineFactory(ProxyProtocol.NONE));
        channelBuilder.configure(args.sslClientConfigs);
        this.selector = new Selector(5000L, new Metrics(), time, "MetricGroup", (ChannelBuilder)channelBuilder, logContext);
        return this.selector;
    }

    private void verifySslConfigs(Args args) throws Exception {
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.createSelector(args.sslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        String node = "0";
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
    }

    private void verifySslConfigsWithHandshakeFailure(Args args) throws Exception {
        this.verifySslConfigsWithHandshakeFailure(args, 0);
    }

    private void verifySslConfigsWithHandshakeFailure(Args args, int pollDelayMs) throws Exception {
        this.server = this.createEchoServer(args, SecurityProtocol.SSL);
        this.createSelector(args.sslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port());
        String node = "0";
        this.selector.connect(node, addr, 4096, 4096);
        NetworkTestUtils.waitForChannelClose(this.selector, node, ChannelState.State.AUTHENTICATION_FAILED, pollDelayMs);
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    private static CertStores.Builder certBuilder(boolean isServer, String cn, boolean useInlinePem) {
        return new CertStores.Builder(isServer).cn(cn).usePem(useInlinePem);
    }

    private Supplier<ApiVersionsResponse> defaultApiVersionsSupplier() {
        return () -> ApiVersionsResponse.defaultApiVersionsResponse((ApiMessageType.ListenerType)ApiMessageType.ListenerType.ZK_BROKER);
    }

    private void verifyAuditLogEvent(Args args, boolean authenticationSuccess) throws InterruptedException {
        TestUtils.waitForCondition(() -> ((Args)args).auditLogProvider.authenticationEvents.size() == 1, "audit event not generated");
        AuthenticationEvent authenticationEvent = ((Args)args).auditLogProvider.authenticationEvents.get(0);
        Assertions.assertNotNull((Object)authenticationEvent.authenticationContext());
        if (authenticationSuccess) {
            Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
        } else {
            Assertions.assertEquals((Object)AuditEventStatus.UNKNOWN_USER_DENIED, (Object)authenticationEvent.status());
            Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        }
    }

    private static class TestAuditLogProvider
    implements AuditLogProvider {
        public final List<AuthenticationEvent> authenticationEvents = new ArrayList<AuthenticationEvent>();

        private TestAuditLogProvider() {
        }

        public boolean providerConfigured(Map<String, ?> configs) {
            return false;
        }

        public void logEvent(AuditEvent auditEvent) {
            this.authenticationEvents.add((AuthenticationEvent)auditEvent);
        }

        public void setSanitizer(UnaryOperator<AuditEvent> sanitizer) {
        }

        public boolean usesMetadataFromThisKafkaCluster() {
            return false;
        }

        public void close(String brokerSessionUuid) throws Exception {
            this.close();
        }

        public void close() throws Exception {
        }

        public Set<String> reconfigurableConfigs() {
            return null;
        }

        public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
        }

        public void reconfigure(Map<String, ?> configs) {
        }

        public void configure(Map<String, ?> configs) {
        }
    }

    static class TestSslChannelBuilder
    extends SslChannelBuilder {
        private Integer netReadBufSizeOverride;
        private Integer netWriteBufSizeOverride;
        private Integer appBufSizeOverride;
        private long failureIndex = Long.MAX_VALUE;
        FailureAction readFailureAction = FailureAction.NO_OP;
        FailureAction flushFailureAction = FailureAction.NO_OP;
        int flushDelayCount = 0;
        private final Mode mode;

        public TestSslChannelBuilder(Mode mode) {
            super(mode, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.SSL), false, new LogContext(), new ProxyProtocolEngineFactory(ProxyProtocol.NONE));
            this.mode = mode;
        }

        public void configureBufferSizes(Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) {
            this.netReadBufSizeOverride = netReadBufSize;
            this.netWriteBufSizeOverride = netWriteBufSize;
            this.appBufSizeOverride = appBufSize;
        }

        protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, ChannelMetadataRegistry metadataRegistry, ProxyProtocolEngine proxyProtocolEngine) {
            SocketChannel socketChannel = (SocketChannel)key.channel();
            SSLEngine sslEngine = sslFactory.createSslEngine(socketChannel.socket());
            return this.newTransportLayer(id, key, sslEngine, this.mode, proxyProtocolEngine);
        }

        protected TestSslTransportLayer newTransportLayer(String id, SelectionKey key, SSLEngine sslEngine, Mode mode, ProxyProtocolEngine proxyProtocolEngine) {
            return new TestSslTransportLayer(id, key, sslEngine, mode, proxyProtocolEngine);
        }

        static class ResizeableBufferSize {
            private Integer bufSizeOverride;

            ResizeableBufferSize(Integer bufSizeOverride) {
                this.bufSizeOverride = bufSizeOverride;
            }

            int updateAndGet(int actualSize, boolean update) {
                int size = actualSize;
                if (this.bufSizeOverride != null) {
                    if (update) {
                        this.bufSizeOverride = Math.min(this.bufSizeOverride * 2, size);
                    }
                    size = this.bufSizeOverride;
                }
                return size;
            }
        }

        class TestSslTransportLayer
        extends SslTransportLayer {
            private final ResizeableBufferSize netReadBufSize;
            private final ResizeableBufferSize netWriteBufSize;
            private final ResizeableBufferSize appBufSize;
            private final AtomicLong numReadsRemaining;
            private final AtomicLong numFlushesRemaining;
            private final AtomicInteger numDelayedFlushesRemaining;

            public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, Mode mode, ProxyProtocolEngine proxyProtocolEngine) {
                super(channelId, key, sslEngine, (ChannelMetadataRegistry)new DefaultChannelMetadataRegistry(), mode, proxyProtocolEngine);
                this.netReadBufSize = new ResizeableBufferSize(TestSslChannelBuilder.this.netReadBufSizeOverride);
                this.netWriteBufSize = new ResizeableBufferSize(TestSslChannelBuilder.this.netWriteBufSizeOverride);
                this.appBufSize = new ResizeableBufferSize(TestSslChannelBuilder.this.appBufSizeOverride);
                this.numReadsRemaining = new AtomicLong(TestSslChannelBuilder.this.failureIndex);
                this.numFlushesRemaining = new AtomicLong(TestSslChannelBuilder.this.failureIndex);
                this.numDelayedFlushesRemaining = new AtomicInteger(TestSslChannelBuilder.this.flushDelayCount);
            }

            protected int netReadBufferSize() {
                ByteBuffer netReadBuffer = this.netReadBuffer();
                boolean updateBufSize = netReadBuffer != null && !this.netReadBuffer().hasRemaining();
                return this.netReadBufSize.updateAndGet(super.netReadBufferSize(), updateBufSize);
            }

            protected int netWriteBufferSize() {
                return this.netWriteBufSize.updateAndGet(super.netWriteBufferSize(), true);
            }

            protected int applicationBufferSize() {
                return this.appBufSize.updateAndGet(super.applicationBufferSize(), true);
            }

            protected int readFromSocketChannel() throws IOException {
                if (this.numReadsRemaining.decrementAndGet() == 0L && !this.ready()) {
                    TestSslChannelBuilder.this.readFailureAction.run();
                }
                return super.readFromSocketChannel();
            }

            protected boolean flush(ByteBuffer buf) throws IOException {
                if (!buf.hasRemaining()) {
                    return super.flush(buf);
                }
                if (this.numFlushesRemaining.decrementAndGet() == 0L && !this.ready()) {
                    TestSslChannelBuilder.this.flushFailureAction.run();
                } else if (this.numDelayedFlushesRemaining.getAndDecrement() != 0) {
                    return false;
                }
                this.resetDelayedFlush();
                return super.flush(buf);
            }

            protected void startHandshake() throws IOException {
                Assertions.assertTrue((boolean)this.socketChannel().isConnected(), (String)"SSL handshake initialized too early");
                super.startHandshake();
            }

            private void resetDelayedFlush() {
                this.numDelayedFlushesRemaining.set(TestSslChannelBuilder.this.flushDelayCount);
            }
        }
    }

    @FunctionalInterface
    private static interface FailureAction {
        public static final FailureAction NO_OP = () -> {};
        public static final FailureAction THROW_IO_EXCEPTION = () -> {
            throw new IOException("Test IO exception");
        };

        public void run() throws IOException;
    }

    private static class SslTransportLayerArgumentsProvider
    implements ArgumentsProvider {
        private SslTransportLayerArgumentsProvider() {
        }

        public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
            ArrayList<Arguments> parameters = new ArrayList<Arguments>();
            parameters.add(Arguments.of((Object[])new Object[]{new Args("TLSv1.2", false, DefaultSslEngineFactory.class)}));
            parameters.add(Arguments.of((Object[])new Object[]{new Args("TLSv1.2", true, DefaultSslEngineFactory.class)}));
            parameters.add(Arguments.of((Object[])new Object[]{new Args("TLSv1.2", true, NettySslEngineFactory.class)}));
            if (Java.IS_JAVA11_COMPATIBLE) {
                parameters.add(Arguments.of((Object[])new Object[]{new Args("TLSv1.3", false, DefaultSslEngineFactory.class)}));
                parameters.add(Arguments.of((Object[])new Object[]{new Args("TLSv1.3", false, NettySslEngineFactory.class)}));
            }
            return parameters.stream();
        }
    }

    private static class Args {
        private final String tlsProtocol;
        private final boolean useInlinePem;
        private final Class<?> engineFactoryClass;
        private CertStores serverCertStores;
        private CertStores clientCertStores;
        private Map<String, Object> sslClientConfigs;
        private Map<String, Object> sslServerConfigs;
        private Map<String, Object> sslConfigOverrides;
        private TestAuditLogProvider auditLogProvider;

        public Args(String tlsProtocol, boolean useInlinePem, Class<?> engineFactoryClass) throws Exception {
            this.tlsProtocol = tlsProtocol;
            this.useInlinePem = useInlinePem;
            this.engineFactoryClass = engineFactoryClass;
            this.sslConfigOverrides = new HashMap<String, Object>();
            this.sslConfigOverrides.put("ssl.protocol", tlsProtocol);
            this.sslConfigOverrides.put("ssl.enabled.protocols", Collections.singletonList(tlsProtocol));
            this.init();
        }

        Map<String, Object> getTrustingConfig(CertStores certStores, CertStores peerCertStores) {
            Map<String, Object> configs = certStores.getTrustingConfig(peerCertStores);
            configs.putAll(this.sslConfigOverrides);
            return configs;
        }

        private void init() throws Exception {
            this.serverCertStores = SslTransportLayerTest.certBuilder(true, "server", this.useInlinePem).addHostName("localhost").build();
            this.clientCertStores = SslTransportLayerTest.certBuilder(false, "client", this.useInlinePem).addHostName("localhost").build();
            this.sslServerConfigs = this.getTrustingConfig(this.serverCertStores, this.clientCertStores);
            this.sslClientConfigs = this.getTrustingConfig(this.clientCertStores, this.serverCertStores);
            this.sslServerConfigs.put("ssl.engine.factory.class", this.engineFactoryClass);
            this.sslClientConfigs.put("ssl.engine.factory.class", this.engineFactoryClass);
            this.auditLogProvider = new TestAuditLogProvider();
        }

        private boolean supportsCipherConfiguration() {
            return !this.tlsProtocol.equals("TLSv1.3") || this.sslServerConfigs.get("ssl.engine.factory.class") != NettySslEngineFactory.class;
        }

        private String[] supportedCipherSuites() {
            String[] stringArray;
            if (this.tlsProtocol.equals("TLSv1.3")) {
                String[] stringArray2 = new String[2];
                stringArray2[0] = "TLS_AES_256_GCM_SHA384";
                stringArray = stringArray2;
                stringArray2[1] = "TLS_AES_128_GCM_SHA256";
            } else {
                String[] stringArray3 = new String[2];
                stringArray3[0] = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
                stringArray = stringArray3;
                stringArray3[1] = "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256";
            }
            return stringArray;
        }

        public String toString() {
            return "tlsProtocol=" + this.tlsProtocol + ", useInlinePem=" + this.useInlinePem;
        }
    }
}

