/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.DataInputStream;
import java.io.File;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.cluster.Broker;
import kafka.controller.ControlMetadataBatch;
import kafka.controller.ControllerChannelManager;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.LeaderAndIsrBatch;
import kafka.controller.StateChangeLogger;
import kafka.log.LogManager$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import kafka.zk.KafkaZkClient;
import kafka.zk.ZooKeeperTestHarness;
import kafka.zookeeper.ZooKeeperClientTimeoutException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import scala.Array$;
import scala.Function1;
import scala.Function2;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

@Timeout(value=60L)
@ScalaSignature(bytes="\u0006\u0001\u0005Ec\u0001B\r\u001b\u0001}AQA\n\u0001\u0005\u0002\u001dBqA\u000b\u0001A\u0002\u0013\u00051\u0006C\u00040\u0001\u0001\u0007I\u0011\u0001\u0019\t\re\u0002\u0001\u0015)\u0003-\u0011\u001dQ\u0004A1A\u0005\u0002mBa\u0001\u0012\u0001!\u0002\u0013a\u0004bB#\u0001\u0005\u0004%\ta\u000f\u0005\u0007\r\u0002\u0001\u000b\u0011\u0002\u001f\t\u000f\u001d\u0003!\u0019!C\u0001\u0011\"1\u0011\u000b\u0001Q\u0001\n%CqA\u0015\u0001C\u0002\u0013\u0005\u0001\n\u0003\u0004T\u0001\u0001\u0006I!\u0013\u0005\u0006)\u0002!\t%\u0016\u0005\u0006G\u0002!\t!\u0016\u0005\u0006Q\u0002!\t!\u0016\u0005\u0006U\u0002!\t!\u0016\u0005\u0006Y\u0002!\t!\u0016\u0005\u0006]\u0002!Ia\u001c\u0005\t\u0003?\u0001\u0001\u0015\"\u0003\u0002\"!1\u00111\u0007\u0001\u0005\u0002UCa!!\u000e\u0001\t\u0003)\u0006BBA\u001d\u0001\u0011\u0005Q\u000b\u0003\u0004\u0002>\u0001!\t!\u0016\u0005\u0007\u0003\u0003\u0002A\u0011A+\u0003%M+'O^3s'\",H\u000fZ8x]R+7\u000f\u001e\u0006\u00037q\taa]3sm\u0016\u0014(\"A\u000f\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\t\t\u0003C\u0011j\u0011A\t\u0006\u0003Gq\t!A_6\n\u0005\u0015\u0012#\u0001\u0006.p_.+W\r]3s)\u0016\u001cH\u000fS1s]\u0016\u001c8/\u0001\u0004=S:LGO\u0010\u000b\u0002QA\u0011\u0011\u0006A\u0007\u00025\u000511m\u001c8gS\u001e,\u0012\u0001\f\t\u0003S5J!A\f\u000e\u0003\u0017-\u000bgm[1D_:4\u0017nZ\u0001\u000bG>tg-[4`I\u0015\fHCA\u00198!\t\u0011T'D\u00014\u0015\u0005!\u0014!B:dC2\f\u0017B\u0001\u001c4\u0005\u0011)f.\u001b;\t\u000fa\u001a\u0011\u0011!a\u0001Y\u0005\u0019\u0001\u0010J\u0019\u0002\u000f\r|gNZ5hA\u0005!\u0001n\\:u+\u0005a\u0004CA\u001fC\u001b\u0005q$BA A\u0003\u0011a\u0017M\\4\u000b\u0003\u0005\u000bAA[1wC&\u00111I\u0010\u0002\u0007'R\u0014\u0018N\\4\u0002\u000b!|7\u000f\u001e\u0011\u0002\u000bQ|\u0007/[2\u0002\rQ|\u0007/[2!\u0003\u0015\u0019XM\u001c;2+\u0005I\u0005c\u0001&Py5\t1J\u0003\u0002M\u001b\u0006I\u0011.\\7vi\u0006\u0014G.\u001a\u0006\u0003\u001dN\n!bY8mY\u0016\u001cG/[8o\u0013\t\u00016J\u0001\u0003MSN$\u0018AB:f]R\f\u0004%A\u0003tK:$('\u0001\u0004tK:$(\u0007I\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002c!\u0012Qb\u0016\t\u00031\u0006l\u0011!\u0017\u0006\u00035n\u000b1!\u00199j\u0015\taV,A\u0004kkBLG/\u001a:\u000b\u0005y{\u0016!\u00026v]&$(\"\u00011\u0002\u0007=\u0014x-\u0003\u0002c3\nQ!)\u001a4pe\u0016,\u0015m\u00195\u0002#Q,7\u000f^\"mK\u0006t7\u000b[;uI><h\u000e\u000b\u0002\u000fKB\u0011\u0001LZ\u0005\u0003Of\u0013A\u0001V3ti\u0006\u0019C/Z:u\u00072,\u0017M\\*ikR$wn\u001e8BMR,'OR1jY\u0016$7\u000b^1siV\u0004\bFA\bf\u0003M\"Xm\u001d;DY\u0016\fgn\u00155vi\u0012|wO\\!gi\u0016\u0014h)Y5mK\u0012\u001cF/\u0019:ukB$U/\u001a+p\u0007>\u0014(/\u001e9u\u0019><7\u000f\u000b\u0002\u0011K\u0006\u0011C/Z:u\u00072,\u0017M\\*ikR$wn\u001e8XSRD'l[+oCZ\f\u0017\u000e\\1cY\u0016D#!E3\u0002KY,'/\u001b4z\u00072,\u0017M\\*ikR$wn\u001e8BMR,'OR1jY\u0016$7\u000b^1siV\u0004XC\u00019})\r\t\u0018Q\u0004\u000b\u0003cIDQa\u001d\nA\u0004Q\f\u0011#\u001a=dKB$\u0018n\u001c8DY\u0006\u001c8\u000fV1h!\r)\bP_\u0007\u0002m*\u0011qoM\u0001\be\u00164G.Z2u\u0013\tIhO\u0001\u0005DY\u0006\u001c8\u000fV1h!\tYH\u0010\u0004\u0001\u0005\u000bu\u0014\"\u0019\u0001@\u0003\u0003\u0015\u000b2a`A\u0003!\r\u0011\u0014\u0011A\u0005\u0004\u0003\u0007\u0019$a\u0002(pi\"Lgn\u001a\t\u0005\u0003\u000f\t9B\u0004\u0003\u0002\n\u0005Ma\u0002BA\u0006\u0003#i!!!\u0004\u000b\u0007\u0005=a$\u0001\u0004=e>|GOP\u0005\u0002i%\u0019\u0011QC\u001a\u0002\u000fA\f7m[1hK&!\u0011\u0011DA\u000e\u0005%)\u0005pY3qi&|gNC\u0002\u0002\u0016MBQA\u000b\nA\u00021\na#[:O_:$\u0015-Z7p].\u000bgm[1UQJ,\u0017\r\u001a\u000b\u0005\u0003G\tI\u0003E\u00023\u0003KI1!a\n4\u0005\u001d\u0011un\u001c7fC:Dq!a\u000b\u0014\u0001\u0004\ti#A\u0001u!\ri\u0014qF\u0005\u0004\u0003cq$A\u0002+ie\u0016\fG-\u0001\u000fwKJLg-\u001f(p]\u0012\u000bW-\\8o)\"\u0014X-\u00193t'R\fG/^:\u0002/Q,7\u000f^\"p]N,7-\u001e;jm\u0016\u001c\u0006.\u001e;e_^t\u0007FA\u000bf\u0003E!Xm\u001d;CK\u001eLgn\u00155vi\u0012|wO\u001c\u0015\u0003-\u0015\f1\u0004^3ti\n+w-\u001b8TQV$Hm\\<o/J|gnZ#q_\u000eD\u0007FA\ff\u0003\u0001\"Xm\u001d;D_:$(o\u001c7mKJ\u001c\u0006.\u001e;e_^tG)\u001e:j]\u001e\u001cVM\u001c3)\u0005a)\u0007f\u0002\u0001\u0002H\u00055\u0013q\n\t\u00041\u0006%\u0013bAA&3\n9A+[7f_V$\u0018!\u0002<bYV,g$\u0001\u001f")
public class ServerShutdownTest
extends ZooKeeperTestHarness {
    private KafkaConfig config = null;
    private final String host;
    private final String topic;
    private final List<String> sent1 = new .colon.colon((Object)"hello", (List)new .colon.colon((Object)"there", (List)Nil$.MODULE$));
    private final List<String> sent2 = new .colon.colon((Object)"more", (List)new .colon.colon((Object)"messages", (List)Nil$.MODULE$));

    public KafkaConfig config() {
        return this.config;
    }

    public void config_$eq(KafkaConfig x$1) {
        this.config = x$1;
    }

    public String host() {
        return this.host;
    }

    public String topic() {
        return this.topic;
    }

    public List<String> sent1() {
        return this.sent1;
    }

    public List<String> sent2() {
        return this.sent2;
    }

    @Override
    @BeforeEach
    public void setUp() {
        super.setUp();
        Properties props = TestUtils$.MODULE$.createBrokerConfig(0, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        this.config_$eq(KafkaConfig$.MODULE$.fromProps(props));
    }

    @Test
    public void testCleanShutdown() {
        KafkaConfig x$12 = this.config();
        Option x$22 = Option$.MODULE$.apply((Object)this.getClass().getName());
        Time x$32 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        boolean x$42 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(x$12, x$32, x$22, x$42);
        server.startup();
        ObjectRef producer = ObjectRef.create((Object)ServerShutdownTest.createProducer$1(server));
        KafkaZkClient x$5 = this.zkClient();
        String x$6 = this.topic();
        .colon.colon x$7 = new .colon.colon((Object)server, (List)Nil$.MODULE$);
        int x$8 = TestUtils$.MODULE$.createTopic$default$3();
        int x$9 = TestUtils$.MODULE$.createTopic$default$4();
        Properties x$10 = TestUtils$.MODULE$.createTopic$default$6();
        TestUtils$.MODULE$.createTopic(x$5, x$6, x$8, x$9, (Seq<KafkaServer>)x$7, x$10);
        ((List)this.sent1().map((Function1 & Serializable & scala.Serializable)value -> ((KafkaProducer)producer$1.elem).send(new ProducerRecord(this.topic(), (Object)Predef$.MODULE$.int2Integer(0), value)), List$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x$1 -> (RecordMetadata)x$1.get());
        server.shutdown();
        this.config().logDirs().foreach((Function1 & Serializable & scala.Serializable)logDir -> {
            ServerShutdownTest.$anonfun$testCleanShutdown$3(logDir);
            return BoxedUnit.UNIT;
        });
        ((KafkaProducer)producer.elem).close();
        server = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        server.startup();
        TestUtils$.MODULE$.waitForPartitionMetadata((Seq<KafkaServer>)new .colon.colon((Object)server, (List)Nil$.MODULE$), this.topic(), 0, TestUtils$.MODULE$.waitForPartitionMetadata$default$4());
        producer.elem = ServerShutdownTest.createProducer$1(server);
        KafkaConsumer consumer = ServerShutdownTest.createConsumer$1(server);
        consumer.subscribe((Collection)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic(), (List)Nil$.MODULE$)).asJava());
        Seq consumerRecords = TestUtils$.MODULE$.consumeRecords(consumer, this.sent1().size(), TestUtils$.MODULE$.consumeRecords$default$3());
        Assertions.assertEquals(this.sent1(), (Object)consumerRecords.map((Function1 & Serializable & scala.Serializable)x$2 -> (String)x$2.value(), Seq$.MODULE$.canBuildFrom()));
        ((List)this.sent2().map((Function1 & Serializable & scala.Serializable)value -> ((KafkaProducer)producer$1.elem).send(new ProducerRecord(this.topic(), (Object)Predef$.MODULE$.int2Integer(0), value)), List$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x$3 -> (RecordMetadata)x$3.get());
        Seq consumerRecords2 = TestUtils$.MODULE$.consumeRecords(consumer, this.sent2().size(), TestUtils$.MODULE$.consumeRecords$default$3());
        Assertions.assertEquals(this.sent2(), (Object)consumerRecords2.map((Function1 & Serializable & scala.Serializable)x$4 -> (String)x$4.value(), Seq$.MODULE$.canBuildFrom()));
        consumer.close();
        ((KafkaProducer)producer.elem).close();
        server.shutdown();
        CoreUtils$.MODULE$.delete(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    @Test
    public void testCleanShutdownAfterFailedStartup() {
        Properties newProps = TestUtils$.MODULE$.createBrokerConfig(0, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        newProps.setProperty(KafkaConfig$.MODULE$.ZkConnectionTimeoutMsProp(), "50");
        newProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), "some.invalid.hostname.foo.bar.local:65535");
        KafkaConfig newConfig = KafkaConfig$.MODULE$.fromProps(newProps);
        this.verifyCleanShutdownAfterFailedStartup(newConfig, ClassTag$.MODULE$.apply(ZooKeeperClientTimeoutException.class));
    }

    @Test
    public void testCleanShutdownAfterFailedStartupDueToCorruptLogs() {
        KafkaConfig x$1 = this.config();
        Option x$2 = Option$.MODULE$.apply((Object)this.getClass().getName());
        Time x$3 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        boolean x$4 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(x$1, x$3, x$2, x$4);
        server.startup();
        KafkaZkClient x$5 = this.zkClient();
        String x$6 = this.topic();
        .colon.colon x$7 = new .colon.colon((Object)server, (List)Nil$.MODULE$);
        int x$8 = TestUtils$.MODULE$.createTopic$default$3();
        int x$9 = TestUtils$.MODULE$.createTopic$default$4();
        Properties x$10 = TestUtils$.MODULE$.createTopic$default$6();
        TestUtils$.MODULE$.createTopic(x$5, x$6, x$8, x$9, (Seq<KafkaServer>)x$7, x$10);
        server.shutdown();
        server.awaitShutdown();
        this.config().logDirs().foreach((Function1 & Serializable & scala.Serializable)dirName -> {
            ServerShutdownTest.$anonfun$testCleanShutdownAfterFailedStartupDueToCorruptLogs$1(this, dirName);
            return BoxedUnit.UNIT;
        });
        this.verifyCleanShutdownAfterFailedStartup(this.config(), ClassTag$.MODULE$.apply(KafkaStorageException.class));
    }

    @Test
    public void testCleanShutdownWithZkUnavailable() {
        KafkaConfig x$1 = this.config();
        Option x$2 = Option$.MODULE$.apply((Object)this.getClass().getName());
        Time x$3 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        boolean x$4 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(x$1, x$3, x$2, x$4);
        server.startup();
        this.shutdownZooKeeper();
        server.shutdown();
        server.awaitShutdown();
        CoreUtils$.MODULE$.delete(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    private <E extends Exception> void verifyCleanShutdownAfterFailedStartup(KafkaConfig config, ClassTag<E> exceptionClassTag) {
        Option x$2 = Option$.MODULE$.apply((Object)this.getClass().getName());
        Time x$3 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        boolean x$4 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(config, x$3, x$2, x$4);
        try {
            try {
                server.startup();
                Assertions.fail((String)"Expected KafkaServer setup to fail and throw exception");
            }
            catch (Exception e) {
                Assertions.assertTrue((boolean)exceptionClassTag.runtimeClass().isInstance(e), (String)new StringBuilder(21).append("Unexpected exception ").append(e).toString());
                Assertions.assertEquals((Object)BrokerState.NOT_RUNNING, (Object)server.brokerState());
            }
        }
        finally {
            BrokerState brokerState = server.brokerState();
            BrokerState brokerState2 = BrokerState.NOT_RUNNING;
            if (brokerState == null ? brokerState2 != null : !brokerState.equals(brokerState2)) {
                server.shutdown();
            }
            server.awaitShutdown();
        }
        CoreUtils$.MODULE$.delete(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    private boolean isNonDaemonKafkaThread(Thread t) {
        return !t.isDaemon() && t.isAlive() && t.getName().startsWith(this.getClass().getName());
    }

    public void verifyNonDaemonThreadsStatus() {
        Assertions.assertEquals((int)0, (int)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(Thread.getAllStackTraces().keySet().toArray())).map((Function1 & Serializable & scala.Serializable)x$5 -> (Thread)x$5, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Thread.class))))).count((Function1 & Serializable & scala.Serializable)t -> BoxesRunTime.boxToBoolean((boolean)this.isNonDaemonKafkaThread(t))));
    }

    @Test
    public void testConsecutiveShutdown() {
        KafkaServer server = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        server.startup();
        server.shutdown();
        server.awaitShutdown();
        server.shutdown();
    }

    @Test
    public void testBeginShutdown() {
        KafkaServer server = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        server.startup();
        long brokerEpoch = server.kafkaController().brokerEpoch();
        server.beginShutdown(brokerEpoch);
        Assertions.assertEquals((Object)server.brokerState(), (Object)BrokerState.SHUTTING_DOWN, (String)"broker should only be in shutting down state");
        server.shutdown();
        server.awaitShutdown();
        Assertions.assertEquals((Object)server.brokerState(), (Object)BrokerState.NOT_RUNNING, (String)"expected broker to be fully shut down");
    }

    @Test
    public void testBeginShutdownWrongEpoch() {
        KafkaServer server = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        server.startup();
        long wrongBrokerEpoch = server.kafkaController().brokerEpoch() + 1L;
        Assertions.assertThrows(StaleBrokerEpochException.class, () -> server.beginShutdown(wrongBrokerEpoch), () -> "expected a begin shutdown requests at a different epoch to result in an exception");
        Assertions.assertEquals((Object)server.brokerState(), (Object)BrokerState.RUNNING, (String)"broker shutdown should not have started");
        server.shutdown();
        server.awaitShutdown();
    }

    @Test
    public void testControllerShutdownDuringSend() {
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        int controllerId = 2;
        Metrics metrics = new Metrics();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ObjectRef serverSocket = ObjectRef.create(null);
        ObjectRef controllerChannelManager = ObjectRef.create(null);
        try {
            serverSocket.elem = new ServerSocket(0);
            Future<?> receiveFuture = executor.submit(new Runnable(null, serverSocket){
                private final ObjectRef serverSocket$1;

                public void run() {
                    Socket socket = ((ServerSocket)this.serverSocket$1.elem).accept();
                    new DataInputStream(socket.getInputStream()).readByte();
                }
                {
                    this.serverSocket$1 = serverSocket$1;
                }
            });
            scala.collection.immutable.Map brokerAndEpochs = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)new Broker(1, "localhost", ((ServerSocket)serverSocket.elem).getLocalPort(), listenerName, securityProtocol), (Object)BoxesRunTime.boxToLong((long)0L))}));
            KafkaConfig controllerConfig = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(controllerId, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
            ControllerContext controllerContext = new ControllerContext();
            controllerContext.setLiveBrokers((Map)brokerAndEpochs);
            controllerChannelManager.elem = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, metrics, new StateChangeLogger(controllerId, true, (Option)None$.MODULE$), ControllerChannelManager$.MODULE$.$lessinit$greater$default$6());
            ((ControllerChannelManager)controllerChannelManager.elem).startup();
            LeaderAndIsrBatch batch = new LeaderAndIsrBatch(1).setControllerId(controllerId).setControllerEpoch(1).setBrokerEpoch(0L).addPartitionState(new TopicPartition("topic", 0), new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()).addLiveLeaders(((TraversableOnce)brokerAndEpochs.keys().map((Function1 & Serializable & scala.Serializable)x$6 -> x$6.node(listenerName), Iterable$.MODULE$.canBuildFrom())).toSet()).addTopicId(this.topic(), Uuid.randomUuid());
            ControllerChannelManager qual$1 = (ControllerChannelManager)controllerChannelManager.elem;
            int x$1 = 1;
            Function2 x$3 = qual$1.sendControlMetadataBatch$default$3();
            qual$1.sendControlMetadataBatch(x$1, (ControlMetadataBatch)batch, x$3);
            receiveFuture.get(10L, TimeUnit.SECONDS);
            executor.submit(new Runnable(null, controllerChannelManager){
                private final ObjectRef controllerChannelManager$1;

                public void run() {
                    ((ControllerChannelManager)this.controllerChannelManager$1.elem).shutdown();
                }
                {
                    this.controllerChannelManager$1 = controllerChannelManager$1;
                }
            }).get(10L, TimeUnit.SECONDS);
        }
        finally {
            if ((ServerSocket)serverSocket.elem != null) {
                ((ServerSocket)serverSocket.elem).close();
            }
            if ((ControllerChannelManager)controllerChannelManager.elem != null) {
                ((ControllerChannelManager)controllerChannelManager.elem).shutdown();
            }
            executor.shutdownNow();
            metrics.close();
        }
    }

    private static final KafkaProducer createProducer$1(KafkaServer server) {
        String x$1 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)new .colon.colon((Object)server, (List)Nil$.MODULE$), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        IntegerSerializer x$2 = new IntegerSerializer();
        StringSerializer x$3 = new StringSerializer();
        int x$4 = TestUtils$.MODULE$.createProducer$default$2();
        long x$5 = TestUtils$.MODULE$.createProducer$default$3();
        long x$6 = TestUtils$.MODULE$.createProducer$default$4();
        int x$7 = TestUtils$.MODULE$.createProducer$default$5();
        int x$8 = TestUtils$.MODULE$.createProducer$default$6();
        int x$9 = TestUtils$.MODULE$.createProducer$default$7();
        int x$10 = TestUtils$.MODULE$.createProducer$default$8();
        String x$11 = TestUtils$.MODULE$.createProducer$default$9();
        int x$12 = TestUtils$.MODULE$.createProducer$default$10();
        SecurityProtocol x$13 = TestUtils$.MODULE$.createProducer$default$11();
        Option<File> x$14 = TestUtils$.MODULE$.createProducer$default$12();
        Option<Properties> x$15 = TestUtils$.MODULE$.createProducer$default$13();
        boolean x$16 = TestUtils$.MODULE$.createProducer$default$16();
        return TestUtils$.MODULE$.createProducer(x$1, x$4, x$5, x$6, x$7, x$8, x$9, x$10, x$11, x$12, x$13, x$14, x$15, x$2, x$3, x$16);
    }

    private static final KafkaConsumer createConsumer$1(KafkaServer server) {
        String x$1 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)new .colon.colon((Object)server, (List)Nil$.MODULE$), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        SecurityProtocol x$2 = SecurityProtocol.PLAINTEXT;
        IntegerDeserializer x$3 = new IntegerDeserializer();
        StringDeserializer x$4 = new StringDeserializer();
        String x$5 = TestUtils$.MODULE$.createConsumer$default$2();
        String x$6 = TestUtils$.MODULE$.createConsumer$default$3();
        boolean x$7 = TestUtils$.MODULE$.createConsumer$default$4();
        boolean x$8 = TestUtils$.MODULE$.createConsumer$default$5();
        int x$9 = TestUtils$.MODULE$.createConsumer$default$6();
        Option<File> x$10 = TestUtils$.MODULE$.createConsumer$default$8();
        Option<Properties> x$11 = TestUtils$.MODULE$.createConsumer$default$9();
        return TestUtils$.MODULE$.createConsumer(x$1, x$5, x$6, x$7, x$8, x$9, x$2, x$10, x$11, x$3, x$4);
    }

    public static final /* synthetic */ void $anonfun$testCleanShutdown$3(String logDir) {
        File OffsetCheckpointFile2 = new File(logDir, LogManager$.MODULE$.RecoveryPointCheckpointFile());
        Assertions.assertTrue((boolean)OffsetCheckpointFile2.exists());
        Assertions.assertTrue((OffsetCheckpointFile2.length() > 0L ? 1 : 0) != 0);
    }

    public static final /* synthetic */ void $anonfun$testCleanShutdownAfterFailedStartupDueToCorruptLogs$1(ServerShutdownTest $this, String dirName) {
        File partitionDir = new File(dirName, new StringBuilder(2).append($this.topic()).append("-0").toString());
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])partitionDir.listFiles())).foreach((Function1 & Serializable & scala.Serializable)f -> {
            TestUtils$.MODULE$.appendNonsenseToFile(f, TestUtils$.MODULE$.random().nextInt(1024) + 1);
            return BoxedUnit.UNIT;
        });
    }

    public ServerShutdownTest() {
        this.host = "localhost";
        this.topic = "test";
    }
}

