/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Properties;
import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.common.FailedToSendMessageException;
import kafka.consumer.SimpleConsumer;
import kafka.message.Message;
import kafka.message.Message$;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaRequestHandler;
import kafka.server.KafkaServer;
import kafka.utils.StaticPartitioner;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import org.scalatest.exceptions.TestFailedException;
import scala.Function0;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.Buffer;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\u0005Mc\u0001B\u0001\u0003\u0001\u001d\u0011A\u0002\u0015:pIV\u001cWM\u001d+fgRT!a\u0001\u0003\u0002\u0011A\u0014x\u000eZ;dKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\r\u0001\u0001B\u0004\t\u0003\u00131i\u0011A\u0003\u0006\u0003\u0017\u0011\t!A_6\n\u00055Q!\u0001\u0006.p_.+W\r]3s)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000f\u0005\u0002\u0010%5\t\u0001C\u0003\u0002\u0012\t\u0005)Q\u000f^5mg&\u00111\u0003\u0005\u0002\b\u0019><w-\u001b8h\u0011\u0015)\u0002\u0001\"\u0001\u0017\u0003\u0019a\u0014N\\5u}Q\tq\u0003\u0005\u0002\u0019\u00015\t!\u0001C\u0004\u001b\u0001\t\u0007I\u0011B\u000e\u0002\u0013\t\u0014xn[3s\u0013\u0012\fT#\u0001\u000f\u0011\u0005u\u0001S\"\u0001\u0010\u000b\u0003}\tQa]2bY\u0006L!!\t\u0010\u0003\u0007%sG\u000f\u0003\u0004$\u0001\u0001\u0006I\u0001H\u0001\u000bEJ|7.\u001a:JIF\u0002\u0003bB\u0013\u0001\u0005\u0004%IaG\u0001\nEJ|7.\u001a:JIJBaa\n\u0001!\u0002\u0013a\u0012A\u00032s_.,'/\u001333A!9\u0011\u0006\u0001a\u0001\n\u0013Q\u0013aB:feZ,'/M\u000b\u0002WA\u0011AfL\u0007\u0002[)\u0011a\u0006B\u0001\u0007g\u0016\u0014h/\u001a:\n\u0005Aj#aC&bM.\f7+\u001a:wKJDqA\r\u0001A\u0002\u0013%1'A\u0006tKJ4XM]\u0019`I\u0015\fHC\u0001\u001b8!\tiR'\u0003\u00027=\t!QK\\5u\u0011\u001dA\u0014'!AA\u0002-\n1\u0001\u001f\u00132\u0011\u0019Q\u0004\u0001)Q\u0005W\u0005A1/\u001a:wKJ\f\u0004\u0005C\u0004=\u0001\u0001\u0007I\u0011\u0002\u0016\u0002\u000fM,'O^3se!9a\b\u0001a\u0001\n\u0013y\u0014aC:feZ,'OM0%KF$\"\u0001\u000e!\t\u000faj\u0014\u0011!a\u0001W!1!\t\u0001Q!\n-\n\u0001b]3sm\u0016\u0014(\u0007\t\u0005\b\t\u0002\u0001\r\u0011\"\u0003F\u0003%\u0019wN\\:v[\u0016\u0014\u0018'F\u0001G!\t9%*D\u0001I\u0015\tIE!\u0001\u0005d_:\u001cX/\\3s\u0013\tY\u0005J\u0001\bTS6\u0004H.Z\"p]N,X.\u001a:\t\u000f5\u0003\u0001\u0019!C\u0005\u001d\u0006i1m\u001c8tk6,'/M0%KF$\"\u0001N(\t\u000fab\u0015\u0011!a\u0001\r\"1\u0011\u000b\u0001Q!\n\u0019\u000b!bY8ogVlWM]\u0019!\u0011\u001d\u0019\u0006\u00011A\u0005\n\u0015\u000b\u0011bY8ogVlWM\u001d\u001a\t\u000fU\u0003\u0001\u0019!C\u0005-\u0006i1m\u001c8tk6,'OM0%KF$\"\u0001N,\t\u000fa\"\u0016\u0011!a\u0001\r\"1\u0011\f\u0001Q!\n\u0019\u000b!bY8ogVlWM\u001d\u001a!\u0011\u001dY\u0006A1A\u0005\nq\u000bAC]3rk\u0016\u001cH\u000fS1oI2,'\u000fT8hO\u0016\u0014X#A/\u0011\u0005y+W\"A0\u000b\u0005\u0001\f\u0017!\u00027pORR'B\u00012d\u0003\u0019\t\u0007/Y2iK*\tA-A\u0002pe\u001eL!AZ0\u0003\r1{wmZ3s\u0011\u0019A\u0007\u0001)A\u0005;\u0006)\"/Z9vKN$\b*\u00198eY\u0016\u0014Hj\\4hKJ\u0004\u0003b\u00026\u0001\u0001\u0004%Ia[\u0001\bg\u0016\u0014h/\u001a:t+\u0005a\u0007cA7sW5\taN\u0003\u0002pa\u0006I\u0011.\\7vi\u0006\u0014G.\u001a\u0006\u0003cz\t!bY8mY\u0016\u001cG/[8o\u0013\t\u0019hN\u0001\u0003MSN$\bbB;\u0001\u0001\u0004%IA^\u0001\fg\u0016\u0014h/\u001a:t?\u0012*\u0017\u000f\u0006\u00025o\"9\u0001\b^A\u0001\u0002\u0004a\u0007BB=\u0001A\u0003&A.\u0001\u0005tKJ4XM]:!\u0011\u0015Y\b\u0001\"\u0001}\u000319W\r^\"p]N,X.\u001a:2)\u00051\u0005\"\u0002@\u0001\t\u0003a\u0018\u0001D4fi\u000e{gn];nKJ\u0014\u0004bBA\u0001\u0001\u0011\u0005\u00131A\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002i!\u001aq0a\u0002\u0011\t\u0005%\u0011qB\u0007\u0003\u0003\u0017Q1!!\u0004d\u0003\u0015QWO\\5u\u0013\u0011\t\t\"a\u0003\u0003\r\t+gm\u001c:f\u0011\u001d\t)\u0002\u0001C!\u0003\u0007\t\u0001\u0002^3be\u0012{wO\u001c\u0015\u0005\u0003'\tI\u0002\u0005\u0003\u0002\n\u0005m\u0011\u0002BA\u000f\u0003\u0017\u0011Q!\u00114uKJDq!!\t\u0001\t\u0003\t\u0019!A\u000fuKN$X\u000b\u001d3bi\u0016\u0014%o\\6feB\u000b'\u000f^5uS>t\u0017J\u001c4pQ\u0011\ty\"!\n\u0011\t\u0005%\u0011qE\u0005\u0005\u0003S\tYA\u0001\u0003UKN$\bbBA\u0017\u0001\u0011\u0005\u00111A\u0001\u0013i\u0016\u001cHoU3oIR{g*Z<U_BL7\r\u000b\u0003\u0002,\u0005\u0015\u0002bBA\u001a\u0001\u0011\u0005\u00111A\u0001\u0017i\u0016\u001cHoU3oI^KG\u000f\u001b#fC\u0012\u0014%o\\6fe\"\"\u0011\u0011GA\u0013\u0011\u001d\tI\u0004\u0001C\u0001\u0003\u0007\t\u0001\u0006^3ti\u0006\u001b\u0018P\\2TK:$7)\u00198D_J\u0014Xm\u0019;ms\u001a\u000b\u0017\u000e\\,ji\"$\u0016.\\3pkRDC!a\u000e\u0002&!9\u0011q\b\u0001\u0005\u0002\u0005\r\u0011a\u0005;fgR\u001cVM\u001c3Ok2dW*Z:tC\u001e,\u0007\u0006BA\u001f\u0003KAs\u0001AA#\u0003\u0017\ny\u0005E\u0002\u001e\u0003\u000fJ1!!\u0013\u001f\u0005)!W\r\u001d:fG\u0006$X\rZ\u0011\u0003\u0003\u001b\n\u0011\n\u00165jg\u0002\"Xm\u001d;!Q\u0006\u001c\bEY3f]\u0002\"W\r\u001d:fG\u0006$X\r\u001a\u0011b]\u0012\u0004\u0013\u000e\u001e\u0011xS2d\u0007EY3!e\u0016lwN^3eA%t\u0007%\u0019\u0011gkR,(/\u001a\u0011sK2,\u0017m]3/C\t\t\t&\u0001\u00051]E\u0002d\u0006\r\u00181\u0001")
public class ProducerTest
extends ZooKeeperTestHarness {
    private final int brokerId1;
    private final int brokerId2;
    private KafkaServer server1 = null;
    private KafkaServer server2 = null;
    private SimpleConsumer consumer1 = null;
    private SimpleConsumer consumer2 = null;
    private final Logger requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);
    private List<KafkaServer> servers = List$.MODULE$.empty();

    private int brokerId1() {
        return this.brokerId1;
    }

    private int brokerId2() {
        return this.brokerId2;
    }

    private KafkaServer server1() {
        return this.server1;
    }

    private void server1_$eq(KafkaServer x$1) {
        this.server1 = x$1;
    }

    private KafkaServer server2() {
        return this.server2;
    }

    private void server2_$eq(KafkaServer x$1) {
        this.server2 = x$1;
    }

    private SimpleConsumer consumer1() {
        return this.consumer1;
    }

    private void consumer1_$eq(SimpleConsumer x$1) {
        this.consumer1 = x$1;
    }

    private SimpleConsumer consumer2() {
        return this.consumer2;
    }

    private void consumer2_$eq(SimpleConsumer x$1) {
        this.consumer2 = x$1;
    }

    private Logger requestHandlerLogger() {
        return this.requestHandlerLogger;
    }

    private List<KafkaServer> servers() {
        return this.servers;
    }

    private void servers_$eq(List<KafkaServer> x$1) {
        this.servers = x$1;
    }

    public SimpleConsumer getConsumer1() {
        block0: {
            if (this.consumer1() != null) break block0;
            this.consumer1_$eq(new SimpleConsumer("localhost", TestUtils$.MODULE$.boundPort(this.server1(), TestUtils$.MODULE$.boundPort$default$2()), 1000000, 65536, ""));
        }
        return this.consumer1();
    }

    public SimpleConsumer getConsumer2() {
        block0: {
            if (this.consumer2() != null) break block0;
            this.consumer2_$eq(new SimpleConsumer("localhost", TestUtils$.MODULE$.boundPort(this.server2(), TestUtils$.MODULE$.boundPort$default$2()), 1000000, 65536, ""));
        }
        return this.consumer2();
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        Properties props1 = TestUtils$.MODULE$.createBrokerConfig(this.brokerId1(), this.zkConnect(), false, TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17());
        props1.put("num.partitions", "4");
        KafkaConfig config1 = KafkaConfig$.MODULE$.fromProps(props1);
        Properties props2 = TestUtils$.MODULE$.createBrokerConfig(this.brokerId2(), this.zkConnect(), false, TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17());
        props2.put("num.partitions", "4");
        KafkaConfig config2 = KafkaConfig$.MODULE$.fromProps(props2);
        this.server1_$eq(TestUtils$.MODULE$.createServer(config1, TestUtils$.MODULE$.createServer$default$2()));
        this.server2_$eq(TestUtils$.MODULE$.createServer(config2, TestUtils$.MODULE$.createServer$default$2()));
        this.servers_$eq((List<KafkaServer>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1(), this.server2()})));
        this.requestHandlerLogger().setLevel(Level.FATAL);
    }

    @Override
    @After
    public void tearDown() {
        this.requestHandlerLogger().setLevel(Level.ERROR);
        if (this.consumer1() != null) {
            this.consumer1().close();
        }
        if (this.consumer2() != null) {
            this.consumer2().close();
        }
        TestUtils$.MODULE$.shutdownServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1(), this.server2()}))));
        super.tearDown();
    }

    @Test
    public void testUpdateBrokerPartitionInfo() {
        String topic = "new-topic";
        TestUtils$.MODULE$.createTopic(this.zkUtils(), topic, 1, 2, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        Properties props = new Properties();
        props.put("message.send.max.retries", "0");
        String x$1 = "localhost:80,localhost:81";
        String x$2 = StringEncoder.class.getName();
        String x$3 = StringEncoder.class.getName();
        Properties x$4 = props;
        String x$5 = TestUtils$.MODULE$.createProducer$default$4();
        try (Producer producer1 = TestUtils$.MODULE$.createProducer(x$1, x$2, x$3, x$5, x$4);){
            try {
                producer1.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
                throw this.fail("Test should fail because the broker list provided are not valid", new Position("ProducerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 112));
            }
            catch (FailedToSendMessageException failedToSendMessageException) {
            }
        }
        try (Producer producer2 = TestUtils$.MODULE$.createProducer("localhost:80," + TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1()}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), StringEncoder.class.getName(), StringEncoder.class.getName(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5());){
            try {
                producer2.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
            }
            catch (Throwable e) {
                throw this.fail("Should succeed sending the message", e, new Position("ProducerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 125));
            }
        }
        try (Producer producer3 = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1(), this.server2()}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), StringEncoder.class.getName(), StringEncoder.class.getName(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5());){
            try {
                producer3.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
            }
            catch (Throwable e) {
                throw this.fail("Should succeed sending the message", e, new Position("ProducerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 138));
            }
        }
    }

    @Test
    public void testSendToNewTopic() {
        Buffer buffer;
        Properties props1 = new Properties();
        props1.put("request.required.acks", "-1");
        String topic = "new-topic";
        TestUtils$.MODULE$.createTopic(this.zkUtils(), topic, 1, 2, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        Producer producer1 = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1(), this.server2()}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), StringEncoder.class.getName(), StringEncoder.class.getName(), StaticPartitioner.class.getName(), props1);
        long startTime = System.currentTimeMillis();
        producer1.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
        producer1.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test2")}));
        long endTime = System.currentTimeMillis();
        Option leaderOpt = this.zkUtils().getLeaderForPartition(topic, 0);
        Assert.assertTrue((String)"Leader for topic new-topic partition 0 should exist", (boolean)leaderOpt.isDefined());
        int leader = BoxesRunTime.unboxToInt((Object)leaderOpt.get());
        if (leader == this.server1().config().brokerId()) {
            FetchResponse response1 = this.getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 10000).build());
            buffer = response1.messageSet("new-topic", 0).iterator().toBuffer();
        } else {
            FetchResponse response2 = this.getConsumer2().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 10000).build());
            buffer = response2.messageSet("new-topic", 0).iterator().toBuffer();
        }
        Buffer messageSet = buffer;
        Assert.assertEquals((String)"Should have fetched 2 messages", (long)2L, (long)messageSet.size());
        Assert.assertTrue((boolean)ByteBuffer.wrap("test1".getBytes()).equals(((MessageAndOffset)messageSet.head()).message().payload()));
        Assert.assertTrue((boolean)ByteBuffer.wrap("test".getBytes()).equals(((MessageAndOffset)messageSet.head()).message().key()));
        Assert.assertTrue((((MessageAndOffset)messageSet.head()).message().timestamp() >= startTime && ((MessageAndOffset)messageSet.head()).message().timestamp() < endTime ? 1 : 0) != 0);
        Assert.assertEquals((Object)TimestampType.CREATE_TIME, (Object)((MessageAndOffset)messageSet.head()).message().timestampType());
        Assert.assertEquals((long)Message$.MODULE$.MagicValue_V1(), (long)((MessageAndOffset)messageSet.head()).message().magic());
        Assert.assertTrue((boolean)ByteBuffer.wrap("test2".getBytes()).equals(((MessageAndOffset)messageSet.apply(1)).message().payload()));
        Assert.assertTrue((boolean)ByteBuffer.wrap("test".getBytes()).equals(((MessageAndOffset)messageSet.apply(1)).message().key()));
        Assert.assertTrue((((MessageAndOffset)messageSet.apply(1)).message().timestamp() >= startTime && ((MessageAndOffset)messageSet.apply(1)).message().timestamp() < endTime ? 1 : 0) != 0);
        Assert.assertEquals((Object)TimestampType.CREATE_TIME, (Object)((MessageAndOffset)messageSet.apply(1)).message().timestampType());
        Assert.assertEquals((long)Message$.MODULE$.MagicValue_V1(), (long)((MessageAndOffset)messageSet.apply(1)).message().magic());
        producer1.close();
        Properties props2 = new Properties();
        props2.put("request.required.acks", "3");
        props2.put("message.send.max.retries", "0");
        try {
            Producer producer2 = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1(), this.server2()}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), StringEncoder.class.getName(), StringEncoder.class.getName(), StaticPartitioner.class.getName(), props2);
            producer2.close();
            throw this.fail("we don't support request.required.acks greater than 1", new Position("ProducerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 205));
        }
        catch (IllegalArgumentException illegalArgumentException) {
            return;
        }
    }

    @Test
    public void testSendWithDeadBroker() {
        Properties props = new Properties();
        props.put("request.required.acks", "1");
        props.put("message.send.max.retries", "0");
        String topic = "new-topic";
        TestUtils$.MODULE$.createTopic(this.zkUtils(), topic, (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)1)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)2)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)3)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0})))}))), (Seq<KafkaServer>)this.servers());
        Producer producer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1(), this.server2()}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), StringEncoder.class.getName(), StringEncoder.class.getName(), StaticPartitioner.class.getName(), props);
        long startTime = System.currentTimeMillis();
        try {
            producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
        }
        catch (Throwable e) {
            throw this.fail("Unexpected exception: " + e, new Position("ProducerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 238));
        }
        long endTime = System.currentTimeMillis();
        this.server1().shutdown();
        this.server1().awaitShutdown();
        try {
            producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
            throw this.fail("Should fail since no leader exists for the partition.", new Position("ProducerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 248));
        }
        catch (TestFailedException e) {
            throw e;
        }
        catch (Throwable throwable) {
            this.server1().startup();
            TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), topic, 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
            TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), topic, 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
            TestUtils$.MODULE$.waitUntilLeaderIsKnown((Seq<KafkaServer>)this.servers(), topic, 0, TestUtils$.MODULE$.waitUntilLeaderIsKnown$default$4());
            try {
                FetchResponse response1 = this.getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 10000).build());
                Iterator messageSet1 = response1.messageSet(topic, 0).iterator();
                Assert.assertTrue((String)"Message set should have 1 message", (boolean)messageSet1.hasNext());
                Message message = ((MessageAndOffset)messageSet1.next()).message();
                Assert.assertTrue((boolean)ByteBuffer.wrap("test1".getBytes()).equals(message.payload()));
                Assert.assertTrue((boolean)ByteBuffer.wrap("test".getBytes()).equals(message.key()));
                Assert.assertTrue((message.timestamp() >= startTime && message.timestamp() < endTime ? 1 : 0) != 0);
                Assert.assertEquals((Object)TimestampType.CREATE_TIME, (Object)message.timestampType());
                Assert.assertEquals((long)Message$.MODULE$.MagicValue_V1(), (long)message.magic());
                Assert.assertFalse((String)"Message set should have another message", (boolean)messageSet1.hasNext());
            }
            catch (Exception e) {
                throw this.fail("Not expected", e, new Position("ProducerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 273));
            }
            producer.close();
            return;
        }
    }

    @Test
    public void testAsyncSendCanCorrectlyFailWithTimeout() {
        String topic = "new-topic";
        TestUtils$.MODULE$.createTopic(this.zkUtils(), topic, (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))}))), (Seq<KafkaServer>)this.servers());
        int timeoutMs = 500;
        Properties props = new Properties();
        props.put("request.timeout.ms", ((Object)BoxesRunTime.boxToInteger((int)timeoutMs)).toString());
        props.put("request.required.acks", "1");
        props.put("message.send.max.retries", "0");
        props.put("client.id", "ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout");
        try (Producer producer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1(), this.server2()}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), StringEncoder.class.getName(), StringEncoder.class.getName(), StaticPartitioner.class.getName(), props);){
            producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test")}));
            ObjectRef messageSet1 = ObjectRef.create(null);
            TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                FetchResponse response1 = this.getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 10000).build());
                messageSet1$1.elem = response1.messageSet(topic, 0).iterator();
                return ((Iterator)messageSet1$1.elem).hasNext();
            }, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Message set should have 1 message", TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
            Assert.assertEquals((Object)ByteBuffer.wrap("test".getBytes()), (Object)((MessageAndOffset)((Iterator)messageSet1.elem).next()).message().payload());
            this.server1().requestHandlerPool().shutdown();
            long t1 = Time.SYSTEM.milliseconds();
            try {
                producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test")}));
                throw this.fail("Exception should have been thrown", new Position("ProducerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 321));
            }
            catch (FailedToSendMessageException failedToSendMessageException) {
                long t2 = Time.SYSTEM.milliseconds();
                Assert.assertTrue((t2 - t1 >= (long)timeoutMs ? 1 : 0) != 0);
            }
        }
    }

    @Test
    public void testSendNullMessage() {
        try (Producer producer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1(), this.server2()}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), StringEncoder.class.getName(), StringEncoder.class.getName(), StaticPartitioner.class.getName(), TestUtils$.MODULE$.createProducer$default$5());){
            TestUtils$.MODULE$.createTopic(this.zkUtils(), "new-topic", 2, 1, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
            producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("new-topic", (Object)"key", null)}));
        }
    }

    public ProducerTest() {
        this.brokerId1 = 0;
        this.brokerId2 = 1;
    }
}

