/*
 * Decompiled with CFR 0.152.
 */
package kafka.network;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import kafka.network.Acceptor;
import kafka.network.ConnectionQuotas;
import kafka.network.Processor;
import kafka.network.RequestChannel;
import kafka.network.RequestMetrics;
import kafka.network.RequestMetrics$;
import kafka.network.SocketServer;
import kafka.network.SocketServerTest$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005-f\u0001B\u0001\u0003\u0001\u001d\u0011\u0001cU8dW\u0016$8+\u001a:wKJ$Vm\u001d;\u000b\u0005\r!\u0011a\u00028fi^|'o\u001b\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tI\u0001#D\u0001\u000b\u0015\tYA\"A\u0003kk:LGO\u0003\u0002\u000e\u001d\u0005I1oY1mCR,7\u000f\u001e\u0006\u0002\u001f\u0005\u0019qN]4\n\u0005EQ!A\u0003&V]&$8+^5uK\")1\u0003\u0001C\u0001)\u00051A(\u001b8jiz\"\u0012!\u0006\t\u0003-\u0001i\u0011A\u0001\u0005\b1\u0001\u0011\r\u0011\"\u0001\u001a\u0003\u0015\u0001(o\u001c9t+\u0005Q\u0002CA\u000e!\u001b\u0005a\"BA\u000f\u001f\u0003\u0011)H/\u001b7\u000b\u0003}\tAA[1wC&\u0011\u0011\u0005\b\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\bBB\u0012\u0001A\u0003%!$\u0001\u0004qe>\u00048\u000f\t\u0005\bK\u0001\u0011\r\u0011\"\u0001'\u0003\u0019\u0019wN\u001c4jOV\tq\u0005\u0005\u0002)W5\t\u0011F\u0003\u0002+\t\u000511/\u001a:wKJL!\u0001L\u0015\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\u0007]\u0001\u0001\u000b\u0011B\u0014\u0002\u000f\r|gNZ5hA!9\u0001\u0007\u0001b\u0001\n\u0003\t\u0014aB7fiJL7m]\u000b\u0002eA\u00111GO\u0007\u0002i)\u0011\u0001'\u000e\u0006\u0003m]\naaY8n[>t'BA\u00039\u0015\tId\"\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0003wQ\u0012q!T3ue&\u001c7\u000f\u0003\u0004>\u0001\u0001\u0006IAM\u0001\t[\u0016$(/[2tA!9!\u0006\u0001b\u0001\n\u0003yT#\u0001!\u0011\u0005Y\t\u0015B\u0001\"\u0003\u00051\u0019vnY6fiN+'O^3s\u0011\u0019!\u0005\u0001)A\u0005\u0001\u000691/\u001a:wKJ\u0004\u0003b\u0002$\u0001\u0005\u0004%\taR\u0001\bg>\u001c7.\u001a;t+\u0005A\u0005cA%Q%6\t!J\u0003\u0002L\u0019\u00069Q.\u001e;bE2,'BA'O\u0003)\u0019w\u000e\u001c7fGRLwN\u001c\u0006\u0002\u001f\u0006)1oY1mC&\u0011\u0011K\u0013\u0002\f\u0003J\u0014\u0018-\u001f\"vM\u001a,'\u000f\u0005\u0002T-6\tAK\u0003\u0002V=\u0005\u0019a.\u001a;\n\u0005]#&AB*pG.,G\u000f\u0003\u0004Z\u0001\u0001\u0006I\u0001S\u0001\tg>\u001c7.\u001a;tA!)1\f\u0001C\u00019\u0006Y1/\u001a8e%\u0016\fX/Z:u)\u0011i\u0016mY6\u0011\u0005y{V\"\u0001(\n\u0005\u0001t%\u0001B+oSRDQA\u0019.A\u0002I\u000baa]8dW\u0016$\b\"\u00023[\u0001\u0004)\u0017a\u0002:fcV,7\u000f\u001e\t\u0004=\u001aD\u0017BA4O\u0005\u0015\t%O]1z!\tq\u0016.\u0003\u0002k\u001d\n!!)\u001f;f\u0011\u001da'\f%AA\u00025\f!!\u001b3\u0011\u0007ys\u0007/\u0003\u0002p\u001d\n1q\n\u001d;j_:\u0004\"AX9\n\u0005It%!B*i_J$\b\"\u0002;\u0001\t\u0003)\u0018a\u0004:fG\u0016Lg/\u001a*fgB|gn]3\u0015\u0005\u00154\b\"\u00022t\u0001\u0004\u0011\u0006\"\u0002=\u0001\t\u0003I\u0018A\u00049s_\u000e,7o\u001d*fcV,7\u000f\u001e\u000b\u0003;jDQa_<A\u0002q\fqa\u00195b]:,G\u000e\u0005\u0002\u0017{&\u0011aP\u0001\u0002\u000f%\u0016\fX/Z:u\u0007\"\fgN\\3m\u0011\u0019A\b\u0001\"\u0001\u0002\u0002Q)Q,a\u0001\u0002\u0006!)1p a\u0001y\"1Am a\u0001\u0003\u000f\u0001B!!\u0003\u0002\u00109\u0019a#a\u0003\n\u0007\u00055!!\u0001\bSKF,Xm\u001d;DQ\u0006tg.\u001a7\n\t\u0005E\u00111\u0003\u0002\b%\u0016\fX/Z:u\u0015\r\tiA\u0001\u0005\b\u0003/\u0001A\u0011AA\r\u0003\u001d\u0019wN\u001c8fGR$RAUA\u000e\u0003?A\u0011\"!\b\u0002\u0016A\u0005\t\u0019\u0001!\u0002\u0003MD!\"!\t\u0002\u0016A\u0005\t\u0019AA\u0012\u0003!\u0001(o\u001c;pG>d\u0007\u0003BA\u0013\u0003Si!!a\n\u000b\u0007\u0005\u0005R'\u0003\u0003\u0002,\u0005\u001d\"\u0001E*fGV\u0014\u0018\u000e^=Qe>$xnY8m\u0011\u001d\ty\u0003\u0001C\u0001\u0003c\t\u0001\u0002^3be\u0012{wO\u001c\u000b\u0002;\"\"\u0011QFA\u001b!\u0011\t9$a\u000f\u000e\u0005\u0005e\"BA\u0006\u000f\u0013\u0011\ti$!\u000f\u0003\u000b\u00053G/\u001a:\t\u000f\u0005\u0005\u0003\u0001\"\u0003\u0002D\u0005!\u0002O]8ek\u000e,'OU3rk\u0016\u001cHOQ=uKN,\u0012!\u001a\u0005\b\u0003\u000f\u0002A\u0011AA\u0019\u00035\u0019\u0018.\u001c9mKJ+\u0017/^3ti\"\"\u0011QIA&!\u0011\t9$!\u0014\n\t\u0005=\u0013\u0011\b\u0002\u0005)\u0016\u001cH\u000fC\u0004\u0002T\u0001!\t!!\r\u0002/Q|wNQ5h%\u0016\fX/Z:u\u0013N\u0014VM[3di\u0016$\u0007\u0006BA)\u0003\u0017Bq!!\u0017\u0001\t\u0003\t\t$\u0001\u000euKN$8k\\2lKR\u001c8\t\\8tK>s7\u000b[;uI><h\u000e\u000b\u0003\u0002X\u0005-\u0003bBA0\u0001\u0011\u0005\u0011\u0011G\u0001\u0018i\u0016\u001cH/T1y\u0007>tg.Z2uS>t7\u000fU3s\u0013BDC!!\u0018\u0002L!9\u0011Q\r\u0001\u0005\u0002\u0005E\u0012\u0001\t;fgRl\u0015\r_\"p]:,7\r^5p]N\u0004VM]%q\u001fZ,'O]5eKNDC!a\u0019\u0002L!9\u00111\u000e\u0001\u0005\u0002\u0005E\u0012a\u0005;fgR\u001c6\u000f\\*pG.,GoU3sm\u0016\u0014\b\u0006BA5\u0003\u0017Bq!!\u001d\u0001\t\u0003\t\t$\u0001\u000buKN$8+Z:tS>t\u0007K]5oG&\u0004\u0018\r\u001c\u0015\u0005\u0003_\nY\u0005C\u0004\u0002x\u0001!\t!!\r\u0002YQ,7\u000f^\"mS\u0016tG\u000fR5tG>tg.Z2uS>tW\u000b\u001d3bi\u0016\u001c(+Z9vKN$X*\u001a;sS\u000e\u001c\b\u0006BA;\u0003\u0017Bq!! \u0001\t\u0003\t\t$A\u001buKN$(I]8lKJ\u001cVM\u001c3BMR,'o\u00115b]:,Gn\u00117pg\u0016$W\u000b\u001d3bi\u0016\u001c(+Z9vKN$X*\u001a;sS\u000e\u001c\b\u0006BA>\u0003\u0017B\u0011\"a!\u0001#\u0003%\t!!\"\u0002+M,g\u000e\u001a*fcV,7\u000f\u001e\u0013eK\u001a\fW\u000f\u001c;%gU\u0011\u0011q\u0011\u0016\u0004[\u0006%5FAAF!\u0011\ti)a&\u000e\u0005\u0005=%\u0002BAI\u0003'\u000b\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005Ue*\u0001\u0006b]:|G/\u0019;j_:LA!!'\u0002\u0010\n\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\t\u0013\u0005u\u0005!%A\u0005\u0002\u0005}\u0015!E2p]:,7\r\u001e\u0013eK\u001a\fW\u000f\u001c;%cU\u0011\u0011\u0011\u0015\u0016\u0004\u0001\u0006%\u0005\"CAS\u0001E\u0005I\u0011AAT\u0003E\u0019wN\u001c8fGR$C-\u001a4bk2$HEM\u000b\u0003\u0003SSC!a\t\u0002\n\u0002")
public class SocketServerTest
extends JUnitSuite {
    private final Properties props;
    private final KafkaConfig config;
    private final Metrics metrics;
    private final SocketServer server;
    private final ArrayBuffer<Socket> sockets;

    public Properties props() {
        return this.props;
    }

    public KafkaConfig config() {
        return this.config;
    }

    public Metrics metrics() {
        return this.metrics;
    }

    public SocketServer server() {
        return this.server;
    }

    public ArrayBuffer<Socket> sockets() {
        return this.sockets;
    }

    public void sendRequest(Socket socket, byte[] request, Option<Object> id) {
        Option<Object> option;
        block4: {
            DataOutputStream outgoing;
            block3: {
                block2: {
                    outgoing = new DataOutputStream(socket.getOutputStream());
                    option = id;
                    if (!(option instanceof Some)) break block2;
                    Some some = (Some)option;
                    short id2 = BoxesRunTime.unboxToShort((Object)some.x());
                    outgoing.writeInt(request.length + 2);
                    outgoing.writeShort(id2);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    break block3;
                }
                if (!None$.MODULE$.equals(option)) break block4;
                outgoing.writeInt(request.length);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            outgoing.write(request);
            outgoing.flush();
            return;
        }
        throw new MatchError(option);
    }

    public Option<Object> sendRequest$default$3() {
        return None$.MODULE$;
    }

    public byte[] receiveResponse(Socket socket) {
        DataInputStream incoming = new DataInputStream(socket.getInputStream());
        int len = incoming.readInt();
        byte[] response = new byte[len];
        incoming.readFully(response);
        return response;
    }

    public void processRequest(RequestChannel channel) {
        RequestChannel.Request request = channel.receiveRequest(2000L);
        Assert.assertNotNull((String)"receiveRequest timed out", (Object)request);
        this.processRequest(channel, request);
    }

    public void processRequest(RequestChannel channel, RequestChannel.Request request) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(request.header().sizeOf() + request.body().sizeOf());
        request.header().writeTo(byteBuffer);
        request.body().writeTo(byteBuffer);
        byteBuffer.rewind();
        NetworkSend send = new NetworkSend(request.connectionId(), new ByteBuffer[]{byteBuffer});
        channel.sendResponse(new RequestChannel.Response(request.processor(), request, (Send)send));
    }

    /*
     * WARNING - void declaration
     */
    public Socket connect(SocketServer s, SecurityProtocol protocol) {
        void var3_3;
        Socket socket = new Socket("localhost", s.boundPort(protocol));
        this.sockets().$plus$eq((Object)socket);
        return var3_3;
    }

    public SocketServer connect$default$1() {
        return this.server();
    }

    public SecurityProtocol connect$default$2() {
        return SecurityProtocol.PLAINTEXT;
    }

    @After
    public void tearDown() {
        this.metrics().close();
        this.server().shutdown();
        this.sockets().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(Socket x$1) {
                x$1.close();
            }
        });
        this.sockets().clear();
    }

    private byte[] producerRequestBytes() {
        short apiKey = 0;
        int correlationId = -1;
        String clientId = "";
        int ackTimeoutMs = 10000;
        short ack = 0;
        RequestHeader emptyHeader = new RequestHeader(apiKey, clientId, correlationId);
        ProduceRequest emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap());
        ByteBuffer byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf() + emptyRequest.sizeOf());
        emptyHeader.writeTo(byteBuffer);
        emptyRequest.writeTo(byteBuffer);
        byteBuffer.rewind();
        byte[] serializedBytes = new byte[byteBuffer.remaining()];
        byteBuffer.get(serializedBytes);
        return serializedBytes;
    }

    @Test
    public void simpleRequest() {
        SecurityProtocol x$20 = SecurityProtocol.PLAINTEXT;
        SocketServer x$21 = this.connect$default$1();
        Socket plainSocket = this.connect(x$21, x$20);
        SecurityProtocol x$22 = SecurityProtocol.TRACE;
        SocketServer x$23 = this.connect$default$1();
        Socket traceSocket = this.connect(x$23, x$22);
        byte[] serializedBytes = this.producerRequestBytes();
        this.sendRequest(plainSocket, serializedBytes, this.sendRequest$default$3());
        this.processRequest(this.server().requestChannel());
        Assert.assertEquals((Object)Predef$.MODULE$.byteArrayOps(serializedBytes).toSeq(), (Object)Predef$.MODULE$.byteArrayOps(this.receiveResponse(plainSocket)).toSeq());
        this.sendRequest(traceSocket, serializedBytes, this.sendRequest$default$3());
        this.processRequest(this.server().requestChannel());
        Assert.assertEquals((Object)Predef$.MODULE$.byteArrayOps(serializedBytes).toSeq(), (Object)Predef$.MODULE$.byteArrayOps(this.receiveResponse(traceSocket)).toSeq());
    }

    @Test
    public void tooBigRequestIsRejected() {
        byte[] tooManyBytes = new byte[Predef$.MODULE$.Integer2int(this.server().config().socketRequestMaxBytes()) + 1];
        new Random().nextBytes(tooManyBytes);
        Socket socket = this.connect(this.connect$default$1(), this.connect$default$2());
        DataOutputStream outgoing = new DataOutputStream(socket.getOutputStream());
        outgoing.writeInt(tooManyBytes.length);
        try {
            outgoing.write(tooManyBytes);
            outgoing.flush();
            this.receiveResponse(socket);
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testSocketsCloseOnShutdown() {
        SecurityProtocol x$24 = SecurityProtocol.PLAINTEXT;
        SocketServer x$25 = this.connect$default$1();
        Socket plainSocket = this.connect(x$25, x$24);
        SecurityProtocol x$26 = SecurityProtocol.TRACE;
        SocketServer x$27 = this.connect$default$1();
        Socket traceSocket = this.connect(x$27, x$26);
        byte[] bytes = new byte[40];
        this.sendRequest(plainSocket, bytes, (Option<Object>)new Some((Object)BoxesRunTime.boxToShort((short)0)));
        this.sendRequest(traceSocket, bytes, (Option<Object>)new Some((Object)BoxesRunTime.boxToShort((short)0)));
        this.processRequest(this.server().requestChannel());
        this.server().acceptors().values().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(Acceptor acceptor) {
                Assert.assertFalse((boolean)acceptor.serverChannel().socket().isClosed());
            }
        }, Iterable$.MODULE$.canBuildFrom());
        this.server().shutdown();
        byte[] largeChunkOfBytes = new byte[1000000];
        try {
            this.sendRequest(plainSocket, largeChunkOfBytes, (Option<Object>)new Some((Object)BoxesRunTime.boxToShort((short)0)));
            throw this.fail("expected exception when writing to closed plain socket");
        }
        catch (IOException iOException) {
            try {
                this.sendRequest(traceSocket, largeChunkOfBytes, (Option<Object>)new Some((Object)BoxesRunTime.boxToShort((short)0)));
                throw this.fail("expected exception when writing to closed trace socket");
            }
            catch (IOException iOException2) {
                return;
            }
        }
    }

    @Test
    public void testMaxConnectionsPerIp() {
        IndexedSeq conns = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), Predef$.MODULE$.Integer2int(this.server().config().maxConnectionsPerIp())).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SocketServerTest $outer;

            public final Socket apply(int x$2) {
                return this.$outer.connect(this.$outer.connect$default$1(), this.$outer.connect$default$2());
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom());
        Socket conn = this.connect(this.connect$default$1(), this.connect$default$2());
        conn.setSoTimeout(3000);
        Assert.assertEquals((long)-1L, (long)conn.getInputStream().read());
        conn.close();
        InetAddress address = ((Socket)conns.head()).getInetAddress();
        ((Socket)conns.head()).close();
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, conns, address){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SocketServerTest $outer;
            private final IndexedSeq conns$1;
            private final InetAddress address$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return this.$outer.server().connectionCount(this.address$1) < this.conns$1.length();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.conns$1 = conns$1;
                this.address$1 = address$1;
            }
        }, "Failed to decrement connection count after close", TestUtils$.MODULE$.waitUntilTrue$default$3());
        Socket conn2 = this.connect(this.connect$default$1(), this.connect$default$2());
        byte[] serializedBytes = this.producerRequestBytes();
        this.sendRequest(conn2, serializedBytes, this.sendRequest$default$3());
        RequestChannel.Request request = this.server().requestChannel().receiveRequest(2000L);
        Assert.assertNotNull((Object)request);
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testMaxConnectionsPerIpOverrides() {
        int overrideNum = Predef$.MODULE$.Integer2int(this.server().config().maxConnectionsPerIp()) + 1;
        int x$28 = 0;
        String x$29 = TestUtils$.MODULE$.MockZkConnect();
        int x$30 = 0;
        boolean x$31 = TestUtils$.MODULE$.createBrokerConfig$default$3();
        boolean x$32 = TestUtils$.MODULE$.createBrokerConfig$default$4();
        Option<SecurityProtocol> x$33 = TestUtils$.MODULE$.createBrokerConfig$default$6();
        Option<File> x$34 = TestUtils$.MODULE$.createBrokerConfig$default$7();
        Option<Properties> x$35 = TestUtils$.MODULE$.createBrokerConfig$default$8();
        boolean x$36 = TestUtils$.MODULE$.createBrokerConfig$default$9();
        boolean x$37 = TestUtils$.MODULE$.createBrokerConfig$default$10();
        int x$38 = TestUtils$.MODULE$.createBrokerConfig$default$11();
        boolean x$39 = TestUtils$.MODULE$.createBrokerConfig$default$12();
        int x$40 = TestUtils$.MODULE$.createBrokerConfig$default$13();
        boolean x$41 = TestUtils$.MODULE$.createBrokerConfig$default$14();
        int x$42 = TestUtils$.MODULE$.createBrokerConfig$default$15();
        Option<String> x$43 = TestUtils$.MODULE$.createBrokerConfig$default$16();
        Properties overrideProps = TestUtils$.MODULE$.createBrokerConfig(x$28, x$29, x$31, x$32, x$30, x$33, x$34, x$35, x$36, x$37, x$38, x$39, x$40, x$41, x$42, x$43);
        overrideProps.put(KafkaConfig$.MODULE$.MaxConnectionsPerIpOverridesProp(), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)overrideNum)})));
        Metrics serverMetrics = new Metrics();
        SocketServer overrideServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(overrideProps), serverMetrics, (Time)new SystemTime());
        try {
            overrideServer.startup();
            IndexedSeq conns = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), overrideNum).map((Function1)new Serializable(this, overrideServer){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ SocketServerTest $outer;
                private final SocketServer overrideServer$1;

                public final Socket apply(int x$3) {
                    return this.$outer.connect(this.overrideServer$1, this.$outer.connect$default$2());
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.overrideServer$1 = overrideServer$1;
                }
            }, IndexedSeq$.MODULE$.canBuildFrom());
            byte[] serializedBytes = this.producerRequestBytes();
            this.sendRequest((Socket)conns.last(), serializedBytes, this.sendRequest$default$3());
            RequestChannel.Request request = overrideServer.requestChannel().receiveRequest(2000L);
            Assert.assertNotNull((Object)request);
            Socket conn = this.connect(overrideServer, this.connect$default$2());
            conn.setSoTimeout(3000);
            Assert.assertEquals((long)-1L, (long)conn.getInputStream().read());
        }
        catch (Throwable throwable) {
            void var19_19;
            void var20_20;
            var20_20.shutdown();
            var19_19.close();
            throw throwable;
        }
        overrideServer.shutdown();
        serverMetrics.close();
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testSslSocketServer() {
        File trustStoreFile = File.createTempFile("truststore", ".jks");
        int x$44 = 0;
        String x$45 = TestUtils$.MODULE$.MockZkConnect();
        Some x$46 = new Some((Object)SecurityProtocol.SSL);
        Some x$47 = new Some((Object)trustStoreFile);
        boolean x$48 = TestUtils$.MODULE$.createBrokerConfig$default$3();
        boolean x$49 = TestUtils$.MODULE$.createBrokerConfig$default$4();
        int x$50 = TestUtils$.MODULE$.createBrokerConfig$default$5();
        Option<Properties> x$51 = TestUtils$.MODULE$.createBrokerConfig$default$8();
        boolean x$52 = TestUtils$.MODULE$.createBrokerConfig$default$9();
        boolean x$53 = TestUtils$.MODULE$.createBrokerConfig$default$10();
        int x$54 = TestUtils$.MODULE$.createBrokerConfig$default$11();
        boolean x$55 = TestUtils$.MODULE$.createBrokerConfig$default$12();
        int x$56 = TestUtils$.MODULE$.createBrokerConfig$default$13();
        boolean x$57 = TestUtils$.MODULE$.createBrokerConfig$default$14();
        int x$58 = TestUtils$.MODULE$.createBrokerConfig$default$15();
        Option<String> x$59 = TestUtils$.MODULE$.createBrokerConfig$default$16();
        Properties overrideProps = TestUtils$.MODULE$.createBrokerConfig(x$44, x$45, x$48, x$49, x$50, (Option<SecurityProtocol>)x$46, (Option<File>)x$47, x$51, x$52, x$53, x$54, x$55, x$56, x$57, x$58, x$59);
        overrideProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
        Metrics serverMetrics = new Metrics();
        SocketServer overrideServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(overrideProps), serverMetrics, (Time)new SystemTime());
        try {
            overrideServer.startup();
            SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
            sslContext.init(null, (TrustManager[])((Object[])new TrustManager[]{TestUtils$.MODULE$.trustAllCerts()}), new SecureRandom());
            SSLSocketFactory socketFactory = sslContext.getSocketFactory();
            SSLSocket sslSocket = (SSLSocket)socketFactory.createSocket("localhost", overrideServer.boundPort(SecurityProtocol.SSL));
            sslSocket.setNeedClientAuth(false);
            short apiKey = ApiKeys.PRODUCE.id;
            int correlationId = -1;
            String clientId = "";
            int ackTimeoutMs = 10000;
            short ack = 0;
            RequestHeader emptyHeader = new RequestHeader(apiKey, clientId, correlationId);
            ProduceRequest emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap());
            ByteBuffer byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf() + emptyRequest.sizeOf());
            emptyHeader.writeTo(byteBuffer);
            emptyRequest.writeTo(byteBuffer);
            byteBuffer.rewind();
            byte[] serializedBytes = new byte[byteBuffer.remaining()];
            byteBuffer.get(serializedBytes);
            this.sendRequest(sslSocket, serializedBytes, this.sendRequest$default$3());
            this.processRequest(overrideServer.requestChannel());
            Assert.assertEquals((Object)Predef$.MODULE$.byteArrayOps(serializedBytes).toSeq(), (Object)Predef$.MODULE$.byteArrayOps(this.receiveResponse(sslSocket)).toSeq());
            sslSocket.close();
        }
        catch (Throwable throwable) {
            void var19_19;
            void var20_20;
            var20_20.shutdown();
            var19_19.close();
            throw throwable;
        }
        overrideServer.shutdown();
        serverMetrics.close();
    }

    @Test
    public void testSessionPrincipal() {
        Socket socket = this.connect(this.connect$default$1(), this.connect$default$2());
        byte[] bytes = new byte[40];
        this.sendRequest(socket, bytes, (Option<Object>)new Some((Object)BoxesRunTime.boxToShort((short)0)));
        Assert.assertEquals((Object)KafkaPrincipal.ANONYMOUS, (Object)this.server().requestChannel().receiveRequest(2000L).session().principal());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testClientDisconnectionUpdatesRequestMetrics() {
        int x$60 = 0;
        String x$61 = TestUtils$.MODULE$.MockZkConnect();
        int x$62 = 0;
        boolean x$63 = TestUtils$.MODULE$.createBrokerConfig$default$3();
        boolean x$64 = TestUtils$.MODULE$.createBrokerConfig$default$4();
        Option<SecurityProtocol> x$65 = TestUtils$.MODULE$.createBrokerConfig$default$6();
        Option<File> x$66 = TestUtils$.MODULE$.createBrokerConfig$default$7();
        Option<Properties> x$67 = TestUtils$.MODULE$.createBrokerConfig$default$8();
        boolean x$68 = TestUtils$.MODULE$.createBrokerConfig$default$9();
        boolean x$69 = TestUtils$.MODULE$.createBrokerConfig$default$10();
        int x$70 = TestUtils$.MODULE$.createBrokerConfig$default$11();
        boolean x$71 = TestUtils$.MODULE$.createBrokerConfig$default$12();
        int x$72 = TestUtils$.MODULE$.createBrokerConfig$default$13();
        boolean x$73 = TestUtils$.MODULE$.createBrokerConfig$default$14();
        int x$74 = TestUtils$.MODULE$.createBrokerConfig$default$15();
        Option<String> x$75 = TestUtils$.MODULE$.createBrokerConfig$default$16();
        Properties props = TestUtils$.MODULE$.createBrokerConfig(x$60, x$61, x$63, x$64, x$62, x$65, x$66, x$67, x$68, x$69, x$70, x$71, x$72, x$73, x$74, x$75);
        Metrics serverMetrics = new Metrics();
        ObjectRef conn = ObjectRef.create(null);
        SocketServer overrideServer = new SocketServer(this, props, serverMetrics, conn){
            public final ObjectRef conn$1;

            public Processor newProcessor(int id, ConnectionQuotas connectionQuotas, SecurityProtocol protocol) {
                return new Processor(this, id, connectionQuotas, protocol){
                    private final /* synthetic */ $anon$2 $outer;

                    public void sendResponse(RequestChannel.Response response) {
                        ((Socket)this.$outer.conn$1.elem).close();
                        super.sendResponse(response);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        super(id$1, $outer.time(), Predef$.MODULE$.Integer2int($outer.config().socketRequestMaxBytes()), $outer.requestChannel(), connectionQuotas$1, Predef$.MODULE$.Long2long($outer.config().connectionsMaxIdleMs()), protocol$1, $outer.config().values(), $outer.metrics());
                    }
                };
            }
            {
                this.conn$1 = conn$1;
                super(KafkaConfig$.MODULE$.fromProps(props$1), serverMetrics$1, (Time)new SystemTime());
            }
        };
        try {
            overrideServer.startup();
            conn.elem = this.connect(overrideServer, this.connect$default$2());
            byte[] serializedBytes = this.producerRequestBytes();
            this.sendRequest((Socket)conn.elem, serializedBytes, this.sendRequest$default$3());
            RequestChannel channel = overrideServer.requestChannel();
            RequestChannel.Request request = channel.receiveRequest(2000L);
            RequestMetrics requestMetrics = (RequestMetrics)RequestMetrics$.MODULE$.metricsMap().apply((Object)ApiKeys.forId((int)request.requestId()).name);
            long expectedTotalTimeCount = this.kafka$network$SocketServerTest$$totalTimeHistCount$1(requestMetrics) + 1L;
            NetworkSend send = new NetworkSend(request.connectionId(), new ByteBuffer[]{ByteBuffer.allocate(550000)});
            channel.sendResponse(new RequestChannel.Response(request.processor(), request, (Send)send));
            TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, requestMetrics, expectedTotalTimeCount){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ SocketServerTest $outer;
                private final RequestMetrics requestMetrics$1;
                private final long expectedTotalTimeCount$1;

                public final boolean apply() {
                    return this.apply$mcZ$sp();
                }

                public boolean apply$mcZ$sp() {
                    return this.$outer.kafka$network$SocketServerTest$$totalTimeHistCount$1(this.requestMetrics$1) == this.expectedTotalTimeCount$1;
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.requestMetrics$1 = requestMetrics$1;
                    this.expectedTotalTimeCount$1 = expectedTotalTimeCount$1;
                }
            }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"request metrics not updated, expected: ", ", actual: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)expectedTotalTimeCount), BoxesRunTime.boxToLong((long)this.kafka$network$SocketServerTest$$totalTimeHistCount$1(requestMetrics))})), TestUtils$.MODULE$.waitUntilTrue$default$3());
        }
        catch (Throwable throwable) {
            void var18_18;
            void var20_20;
            var20_20.shutdown();
            var18_18.close();
            throw throwable;
        }
        overrideServer.shutdown();
        serverMetrics.close();
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testBrokerSendAfterChannelClosedUpdatesRequestMetrics() {
        int x$76 = 0;
        String x$77 = TestUtils$.MODULE$.MockZkConnect();
        int x$78 = 0;
        boolean x$79 = TestUtils$.MODULE$.createBrokerConfig$default$3();
        boolean x$80 = TestUtils$.MODULE$.createBrokerConfig$default$4();
        Option<SecurityProtocol> x$81 = TestUtils$.MODULE$.createBrokerConfig$default$6();
        Option<File> x$82 = TestUtils$.MODULE$.createBrokerConfig$default$7();
        Option<Properties> x$83 = TestUtils$.MODULE$.createBrokerConfig$default$8();
        boolean x$84 = TestUtils$.MODULE$.createBrokerConfig$default$9();
        boolean x$85 = TestUtils$.MODULE$.createBrokerConfig$default$10();
        int x$86 = TestUtils$.MODULE$.createBrokerConfig$default$11();
        boolean x$87 = TestUtils$.MODULE$.createBrokerConfig$default$12();
        int x$88 = TestUtils$.MODULE$.createBrokerConfig$default$13();
        boolean x$89 = TestUtils$.MODULE$.createBrokerConfig$default$14();
        int x$90 = TestUtils$.MODULE$.createBrokerConfig$default$15();
        Option<String> x$91 = TestUtils$.MODULE$.createBrokerConfig$default$16();
        Properties props = TestUtils$.MODULE$.createBrokerConfig(x$76, x$77, x$79, x$80, x$78, x$81, x$82, x$83, x$84, x$85, x$86, x$87, x$88, x$89, x$90, x$91);
        props.setProperty(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), "100");
        Metrics serverMetrics = new Metrics();
        Socket conn = null;
        SocketServer overrideServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(props), serverMetrics, (Time)new SystemTime());
        try {
            overrideServer.startup();
            conn = this.connect(overrideServer, this.connect$default$2());
            byte[] serializedBytes = this.producerRequestBytes();
            this.sendRequest(conn, serializedBytes, this.sendRequest$default$3());
            RequestChannel channel = overrideServer.requestChannel();
            RequestChannel.Request request = channel.receiveRequest(2000L);
            TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, overrideServer, request){
                public static final long serialVersionUID = 0L;
                private final SocketServer overrideServer$2;
                private final RequestChannel.Request request$1;

                public final boolean apply() {
                    return this.apply$mcZ$sp();
                }

                public boolean apply$mcZ$sp() {
                    return this.overrideServer$2.processor(this.request$1.processor()).channel(this.request$1.connectionId()).isEmpty();
                }
                {
                    this.overrideServer$2 = overrideServer$2;
                    this.request$1 = request$1;
                }
            }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Idle connection `", "` was not closed by selector"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{request.connectionId()})), TestUtils$.MODULE$.waitUntilTrue$default$3());
            RequestMetrics requestMetrics = (RequestMetrics)RequestMetrics$.MODULE$.metricsMap().apply((Object)ApiKeys.forId((int)request.requestId()).name);
            long expectedTotalTimeCount = this.kafka$network$SocketServerTest$$totalTimeHistCount$2(requestMetrics) + 1L;
            this.processRequest(channel, request);
            TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, requestMetrics, expectedTotalTimeCount){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ SocketServerTest $outer;
                private final RequestMetrics requestMetrics$2;
                private final long expectedTotalTimeCount$2;

                public final boolean apply() {
                    return this.apply$mcZ$sp();
                }

                public boolean apply$mcZ$sp() {
                    return this.$outer.kafka$network$SocketServerTest$$totalTimeHistCount$2(this.requestMetrics$2) == this.expectedTotalTimeCount$2;
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.requestMetrics$2 = requestMetrics$2;
                    this.expectedTotalTimeCount$2 = expectedTotalTimeCount$2;
                }
            }, new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"request metrics not updated, expected: ", ", actual: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)expectedTotalTimeCount), BoxesRunTime.boxToLong((long)this.kafka$network$SocketServerTest$$totalTimeHistCount$2(requestMetrics))})), TestUtils$.MODULE$.waitUntilTrue$default$3());
        }
        catch (Throwable throwable) {
            void var18_18;
            void var20_20;
            var20_20.shutdown();
            var18_18.close();
            throw throwable;
        }
        overrideServer.shutdown();
        serverMetrics.close();
    }

    public final long kafka$network$SocketServerTest$$totalTimeHistCount$1(RequestMetrics requestMetrics$1) {
        return requestMetrics$1.totalTimeHist().count();
    }

    public final long kafka$network$SocketServerTest$$totalTimeHistCount$2(RequestMetrics requestMetrics$2) {
        return requestMetrics$2.totalTimeHist().count();
    }

    public SocketServerTest() {
        int x$4 = 0;
        String x$5 = TestUtils$.MODULE$.MockZkConnect();
        int x$6 = 0;
        boolean x$7 = TestUtils$.MODULE$.createBrokerConfig$default$3();
        boolean x$8 = TestUtils$.MODULE$.createBrokerConfig$default$4();
        Option<SecurityProtocol> x$9 = TestUtils$.MODULE$.createBrokerConfig$default$6();
        Option<File> x$10 = TestUtils$.MODULE$.createBrokerConfig$default$7();
        Option<Properties> x$11 = TestUtils$.MODULE$.createBrokerConfig$default$8();
        boolean x$12 = TestUtils$.MODULE$.createBrokerConfig$default$9();
        boolean x$13 = TestUtils$.MODULE$.createBrokerConfig$default$10();
        int x$14 = TestUtils$.MODULE$.createBrokerConfig$default$11();
        boolean x$15 = TestUtils$.MODULE$.createBrokerConfig$default$12();
        int x$16 = TestUtils$.MODULE$.createBrokerConfig$default$13();
        boolean x$17 = TestUtils$.MODULE$.createBrokerConfig$default$14();
        int x$18 = TestUtils$.MODULE$.createBrokerConfig$default$15();
        Option<String> x$19 = TestUtils$.MODULE$.createBrokerConfig$default$16();
        this.props = TestUtils$.MODULE$.createBrokerConfig(x$4, x$5, x$7, x$8, x$6, x$9, x$10, x$11, x$12, x$13, x$14, x$15, x$16, x$17, x$18, x$19);
        this.props().put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0");
        this.props().put("num.network.threads", "1");
        this.props().put("socket.send.buffer.bytes", "300000");
        this.props().put("socket.receive.buffer.bytes", "300000");
        this.props().put("queued.max.requests", "50");
        this.props().put("socket.request.max.bytes", "50");
        this.props().put("max.connections.per.ip", "5");
        this.props().put("connections.max.idle.ms", "60000");
        this.config = KafkaConfig$.MODULE$.fromProps(this.props());
        this.metrics = new Metrics();
        this.server = new SocketServer(this.config(), this.metrics(), (Time)new SystemTime());
        this.server().startup();
        this.sockets = new ArrayBuffer();
    }
}

