/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.Future;
import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.consumer.SimpleConsumer;
import kafka.log.LogManager$;
import kafka.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import kafka.server.NotRunning$;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.exception.ZkException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import scala.Array$;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.ObjectRef;

@ScalaSignature(bytes="\u0006\u0001A4A!\u0001\u0002\u0001\u000f\t\u00112+\u001a:wKJ\u001c\u0006.\u001e;e_^tG+Z:u\u0015\t\u0019A!\u0001\u0004tKJ4XM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tIA\"D\u0001\u000b\u0015\tYA!\u0001\u0002{W&\u0011QB\u0003\u0002\u00155>|7*Z3qKJ$Vm\u001d;ICJtWm]:\t\u000b=\u0001A\u0011\u0001\t\u0002\rqJg.\u001b;?)\u0005\t\u0002C\u0001\n\u0001\u001b\u0005\u0011\u0001b\u0002\u000b\u0001\u0001\u0004%\t!F\u0001\u0007G>tg-[4\u0016\u0003Y\u0001\"AE\f\n\u0005a\u0011!aC&bM.\f7i\u001c8gS\u001eDqA\u0007\u0001A\u0002\u0013\u00051$\u0001\u0006d_:4\u0017nZ0%KF$\"\u0001\b\u0012\u0011\u0005u\u0001S\"\u0001\u0010\u000b\u0003}\tQa]2bY\u0006L!!\t\u0010\u0003\tUs\u0017\u000e\u001e\u0005\bGe\t\t\u00111\u0001\u0017\u0003\rAH%\r\u0005\u0007K\u0001\u0001\u000b\u0015\u0002\f\u0002\u000f\r|gNZ5hA!9q\u0005\u0001b\u0001\n\u0003A\u0013\u0001\u00025pgR,\u0012!\u000b\t\u0003U=j\u0011a\u000b\u0006\u0003Y5\nA\u0001\\1oO*\ta&\u0001\u0003kCZ\f\u0017B\u0001\u0019,\u0005\u0019\u0019FO]5oO\"1!\u0007\u0001Q\u0001\n%\nQ\u0001[8ti\u0002Bq\u0001\u000e\u0001C\u0002\u0013\u0005\u0001&A\u0003u_BL7\r\u0003\u00047\u0001\u0001\u0006I!K\u0001\u0007i>\u0004\u0018n\u0019\u0011\t\u000fa\u0002!\u0019!C\u0001s\u0005)1/\u001a8ucU\t!\bE\u0002<\u0001&j\u0011\u0001\u0010\u0006\u0003{y\n\u0011\"[7nkR\f'\r\\3\u000b\u0005}r\u0012AC2pY2,7\r^5p]&\u0011\u0011\t\u0010\u0002\u0005\u0019&\u001cH\u000f\u0003\u0004D\u0001\u0001\u0006IAO\u0001\u0007g\u0016tG/\r\u0011\t\u000f\u0015\u0003!\u0019!C\u0001s\u0005)1/\u001a8ue!1q\t\u0001Q\u0001\ni\naa]3oiJ\u0002\u0003\"B%\u0001\t\u0003R\u0015!B:fiV\u0003H#\u0001\u000f)\u0005!c\u0005CA'S\u001b\u0005q%BA(Q\u0003\u0015QWO\\5u\u0015\u0005\t\u0016aA8sO&\u00111K\u0014\u0002\u0007\u0005\u00164wN]3\t\u000bU\u0003A\u0011\u0001&\u0002#Q,7\u000f^\"mK\u0006t7\u000b[;uI><h\u000e\u000b\u0002U/B\u0011Q\nW\u0005\u00033:\u0013A\u0001V3ti\")1\f\u0001C\u0001\u0015\u00069C/Z:u\u00072,\u0017M\\*ikR$wn\u001e8XSRDG)\u001a7fi\u0016$v\u000e]5d\u000b:\f'\r\\3eQ\tQv\u000bC\u0003_\u0001\u0011\u0005!*A\u0012uKN$8\t\\3b]NCW\u000f\u001e3po:\fe\r^3s\r\u0006LG.\u001a3Ti\u0006\u0014H/\u001e9)\u0005u;\u0006BB1\u0001A\u0013%!-\u0001\fjg:{g\u000eR1f[>t7*\u00194lCRC'/Z1e)\t\u0019g\r\u0005\u0002\u001eI&\u0011QM\b\u0002\b\u0005>|G.Z1o\u0011\u00159\u0007\r1\u0001i\u0003\u0005!\bC\u0001\u0016j\u0013\tQ7F\u0001\u0004UQJ,\u0017\r\u001a\u0005\u0006Y\u0002!\tAS\u0001\u001dm\u0016\u0014\u0018NZ=O_:$\u0015-Z7p]RC'/Z1egN#\u0018\r^;t\u0011\u0015q\u0007\u0001\"\u0001K\u0003]!Xm\u001d;D_:\u001cXmY;uSZ,7\u000b[;uI><h\u000e\u000b\u0002n/\u0002")
public class ServerShutdownTest
extends ZooKeeperTestHarness {
    private KafkaConfig config = null;
    private final String host;
    private final String topic;
    private final List<String> sent1 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"hello", "there"}));
    private final List<String> sent2 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"more", "messages"}));

    public KafkaConfig config() {
        return this.config;
    }

    public void config_$eq(KafkaConfig x$1) {
        this.config = x$1;
    }

    public String host() {
        return this.host;
    }

    public String topic() {
        return this.topic;
    }

    public List<String> sent1() {
        return this.sent1;
    }

    public List<String> sent2() {
        return this.sent2;
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        Properties props = TestUtils$.MODULE$.createBrokerConfig(0, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17());
        this.config_$eq(KafkaConfig$.MODULE$.fromProps(props));
    }

    @Test
    public void testCleanShutdown() {
        KafkaConfig x$17 = this.config();
        Option x$18 = Option$.MODULE$.apply((Object)((Object)((Object)this)).getClass().getName());
        Time x$19 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        Seq x$20 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(x$17, x$19, x$18, x$20);
        server.startup();
        ObjectRef producer = ObjectRef.create((Object)this.createProducer$1(server));
        TestUtils$.MODULE$.createTopic(this.zkUtils(), this.topic(), 1, 1, (Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{server}))), TestUtils$.MODULE$.createTopic$default$6());
        ((List)this.sent1().map((Function1)new Serializable(this, producer){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ServerShutdownTest $outer;
            private final ObjectRef producer$1;

            public final Future<RecordMetadata> apply(String value) {
                return ((KafkaProducer)this.producer$1.elem).send(new ProducerRecord(this.$outer.topic(), (Object)Predef$.MODULE$.int2Integer(0), (Object)value));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.producer$1 = producer$1;
            }
        }, List$.MODULE$.canBuildFrom())).foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final RecordMetadata apply(Future<RecordMetadata> x$1) {
                return x$1.get();
            }
        });
        server.shutdown();
        this.config().logDirs().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(String logDir) {
                File OffsetCheckpointFile2 = new File(logDir, LogManager$.MODULE$.RecoveryPointCheckpointFile());
                Assert.assertTrue((boolean)OffsetCheckpointFile2.exists());
                Assert.assertTrue((OffsetCheckpointFile2.length() > 0L ? 1 : 0) != 0);
            }
        });
        ((KafkaProducer)producer.elem).close();
        server = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        server.startup();
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{server}))), this.topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        producer.elem = this.createProducer$1(server);
        SimpleConsumer consumer = new SimpleConsumer(this.host(), TestUtils$.MODULE$.boundPort(server, TestUtils$.MODULE$.boundPort$default$2()), 1000000, 65536, "");
        ByteBufferMessageSet fetchedMessage = null;
        while (fetchedMessage == null || fetchedMessage.validBytes() == 0) {
            FetchResponse fetched = consumer.fetch(new FetchRequestBuilder().addFetch(this.topic(), 0, 0L, 10000).maxWait(0).build());
            fetchedMessage = fetched.messageSet(this.topic(), 0);
        }
        Assert.assertEquals(this.sent1(), (Object)fetchedMessage.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(MessageAndOffset m) {
                return TestUtils$.MODULE$.readString(m.message().payload(), TestUtils$.MODULE$.readString$default$2());
            }
        }, Iterable$.MODULE$.canBuildFrom()));
        long newOffset = ((MessageAndOffset)fetchedMessage.last()).nextOffset();
        ((List)this.sent2().map((Function1)new Serializable(this, producer){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ServerShutdownTest $outer;
            private final ObjectRef producer$1;

            public final Future<RecordMetadata> apply(String value) {
                return ((KafkaProducer)this.producer$1.elem).send(new ProducerRecord(this.$outer.topic(), (Object)Predef$.MODULE$.int2Integer(0), (Object)value));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.producer$1 = producer$1;
            }
        }, List$.MODULE$.canBuildFrom())).foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final RecordMetadata apply(Future<RecordMetadata> x$2) {
                return x$2.get();
            }
        });
        fetchedMessage = null;
        while (fetchedMessage == null || fetchedMessage.validBytes() == 0) {
            FetchResponse fetched = consumer.fetch(new FetchRequestBuilder().addFetch(this.topic(), 0, newOffset, 10000).build());
            fetchedMessage = fetched.messageSet(this.topic(), 0);
        }
        Assert.assertEquals(this.sent2(), (Object)fetchedMessage.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(MessageAndOffset m) {
                return TestUtils$.MODULE$.readString(m.message().payload(), TestUtils$.MODULE$.readString$default$2());
            }
        }, Iterable$.MODULE$.canBuildFrom()));
        consumer.close();
        ((KafkaProducer)producer.elem).close();
        server.shutdown();
        CoreUtils$.MODULE$.delete(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    @Test
    public void testCleanShutdownWithDeleteTopicEnabled() {
        KafkaConfig newConfig;
        Properties newProps = TestUtils$.MODULE$.createBrokerConfig(0, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17());
        newProps.setProperty("delete.topic.enable", "true");
        KafkaConfig x$21 = newConfig = KafkaConfig$.MODULE$.fromProps(newProps);
        Option x$22 = Option$.MODULE$.apply((Object)((Object)((Object)this)).getClass().getName());
        Time x$23 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        Seq x$24 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(x$21, x$23, x$22, x$24);
        server.startup();
        server.shutdown();
        server.awaitShutdown();
        CoreUtils$.MODULE$.delete(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    @Test
    public void testCleanShutdownAfterFailedStartup() {
        KafkaConfig newConfig;
        Properties newProps = TestUtils$.MODULE$.createBrokerConfig(0, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17());
        newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535");
        KafkaConfig x$25 = newConfig = KafkaConfig$.MODULE$.fromProps(newProps);
        Option x$26 = Option$.MODULE$.apply((Object)((Object)((Object)this)).getClass().getName());
        Time x$27 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        Seq x$28 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(x$25, x$27, x$26, x$28);
        try {
            try {
                server.startup();
                throw this.fail("Expected KafkaServer setup to fail and throw exception", new Position("ServerShutdownTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 133));
            }
            catch (ZkException zkException) {
                Assert.assertEquals((long)NotRunning$.MODULE$.state(), (long)server.brokerState().currentState());
            }
        }
        finally {
            if (server.brokerState().currentState() != NotRunning$.MODULE$.state()) {
                server.shutdown();
            }
            server.awaitShutdown();
        }
        CoreUtils$.MODULE$.delete(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    public boolean kafka$server$ServerShutdownTest$$isNonDaemonKafkaThread(Thread t) {
        return !t.isDaemon() && t.isAlive() && t.getName().startsWith(((Object)((Object)this)).getClass().getName());
    }

    public void verifyNonDaemonThreadsStatus() {
        Assert.assertEquals((long)0L, (long)Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps(Thread.getAllStackTraces().keySet().toArray()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Thread apply(Object x$3) {
                return (Thread)x$3;
            }
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Thread.class)))).count((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ServerShutdownTest $outer;

            public final boolean apply(Thread t) {
                return this.$outer.kafka$server$ServerShutdownTest$$isNonDaemonKafkaThread(t);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        }));
    }

    @Test
    public void testConsecutiveShutdown() {
        KafkaServer server = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        try {
            server.startup();
            server.shutdown();
            server.awaitShutdown();
            server.shutdown();
            Assert.assertTrue((boolean)true);
            return;
        }
        catch (Throwable throwable) {
            throw this.fail(new Position("ServerShutdownTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 172));
        }
    }

    private final KafkaProducer createProducer$1(KafkaServer server) {
        String x$4 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{server}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        int x$5 = 5;
        IntegerSerializer x$6 = new IntegerSerializer();
        StringSerializer x$7 = new StringSerializer();
        int x$8 = TestUtils$.MODULE$.createNewProducer$default$2();
        long x$9 = TestUtils$.MODULE$.createNewProducer$default$3();
        long x$10 = TestUtils$.MODULE$.createNewProducer$default$4();
        long x$11 = TestUtils$.MODULE$.createNewProducer$default$6();
        long x$12 = TestUtils$.MODULE$.createNewProducer$default$7();
        SecurityProtocol x$13 = TestUtils$.MODULE$.createNewProducer$default$8();
        Option<File> x$14 = TestUtils$.MODULE$.createNewProducer$default$9();
        Option<Properties> x$15 = TestUtils$.MODULE$.createNewProducer$default$10();
        Option<Properties> x$16 = TestUtils$.MODULE$.createNewProducer$default$13();
        return TestUtils$.MODULE$.createNewProducer(x$4, x$8, x$9, x$10, x$5, x$11, x$12, x$13, x$14, x$15, x$6, x$7, x$16);
    }

    public ServerShutdownTest() {
        this.host = "localhost";
        this.topic = "test";
    }
}

