/*
 * Decompiled with CFR 0.152.
 */
package kafka.javaapi.consumer;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Properties;
import kafka.common.MessageStreamsExistException;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.integration.KafkaServerTestHarness;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaRequestHandler;
import kafka.server.KafkaServer;
import kafka.utils.IntEncoder;
import kafka.utils.TestUtils$;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.scalactic.source.Position;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005mc\u0001B\u0001\u0003\u0001%\u0011aDW8pW\u0016,\u0007/\u001a:D_:\u001cX/\\3s\u0007>tg.Z2u_J$Vm\u001d;\u000b\u0005\r!\u0011\u0001C2p]N,X.\u001a:\u000b\u0005\u00151\u0011a\u00026bm\u0006\f\u0007/\u001b\u0006\u0002\u000f\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001\u000b!A\u00111BD\u0007\u0002\u0019)\u0011QBB\u0001\fS:$Xm\u001a:bi&|g.\u0003\u0002\u0010\u0019\t12*\u00194lCN+'O^3s)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000f\u0005\u0002\u0012)5\t!C\u0003\u0002\u0014\r\u0005)Q\u000f^5mg&\u0011QC\u0005\u0002\b\u0019><w-\u001b8h\u0011\u00159\u0002\u0001\"\u0001\u0019\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0004\u0005\u0002\u001b\u00015\t!\u0001C\u0004\u001d\u0001\t\u0007I\u0011A\u000f\u0002\u00119,XNT8eKN,\u0012A\b\t\u0003?\tj\u0011\u0001\t\u0006\u0002C\u0005)1oY1mC&\u00111\u0005\t\u0002\u0004\u0013:$\bBB\u0013\u0001A\u0003%a$A\u0005ok6tu\u000eZ3tA!9q\u0005\u0001b\u0001\n\u0003i\u0012\u0001\u00038v[B\u000b'\u000f^:\t\r%\u0002\u0001\u0015!\u0003\u001f\u0003%qW/\u001c)beR\u001c\b\u0005C\u0004,\u0001\t\u0007I\u0011\u0001\u0017\u0002\u000bQ|\u0007/[2\u0016\u00035\u0002\"AL\u001a\u000e\u0003=R!\u0001M\u0019\u0002\t1\fgn\u001a\u0006\u0002e\u0005!!.\u0019<b\u0013\t!tF\u0001\u0004TiJLgn\u001a\u0005\u0007m\u0001\u0001\u000b\u0011B\u0017\u0002\rQ|\u0007/[2!\u0011\u001dA\u0004A1A\u0005\u0002e\nqb\u001c<feJLG-\u001b8h!J|\u0007o]\u000b\u0002uA\u00111HP\u0007\u0002y)\u0011Q(M\u0001\u0005kRLG.\u0003\u0002@y\tQ\u0001K]8qKJ$\u0018.Z:\t\r\u0005\u0003\u0001\u0015!\u0003;\u0003Ayg/\u001a:sS\u0012Lgn\u001a)s_B\u001c\b\u0005C\u0003D\u0001\u0011\u0005A)A\bhK:,'/\u0019;f\u0007>tg-[4t+\u0005)\u0005c\u0001$O#:\u0011q\t\u0014\b\u0003\u0011.k\u0011!\u0013\u0006\u0003\u0015\"\ta\u0001\u0010:p_Rt\u0014\"A\u0011\n\u00055\u0003\u0013a\u00029bG.\fw-Z\u0005\u0003\u001fB\u00131aU3r\u0015\ti\u0005\u0005\u0005\u0002S+6\t1K\u0003\u0002U\r\u000511/\u001a:wKJL!AV*\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\b1\u0002\u0011\r\u0011\"\u0001-\u0003\u00159'o\\;q\u0011\u0019Q\u0006\u0001)A\u0005[\u00051qM]8va\u0002Bq\u0001\u0018\u0001C\u0002\u0013\u0005A&A\u0005d_:\u001cX/\\3sc!1a\f\u0001Q\u0001\n5\n!bY8ogVlWM]\u0019!\u0011\u001d\u0001\u0007A1A\u0005\u0002u\t\u0011B\\'fgN\fw-Z:\t\r\t\u0004\u0001\u0015!\u0003\u001f\u0003)qW*Z:tC\u001e,7\u000f\t\u0005\u0006I\u0002!\t!Z\u0001\ni\u0016\u001cHOQ1tS\u000e$\u0012A\u001a\t\u0003?\u001dL!\u0001\u001b\u0011\u0003\tUs\u0017\u000e\u001e\u0015\u0003G*\u0004\"a\u001b9\u000e\u00031T!!\u001c8\u0002\u000b),h.\u001b;\u000b\u0003=\f1a\u001c:h\u0013\t\tHN\u0001\u0003UKN$\b\"B:\u0001\t\u0003!\u0018\u0001D:f]\u0012lUm]:bO\u0016\u001cHCB;\u0000\u0003\u0017\ty\u0001E\u0002GmbL!a\u001e)\u0003\t1K7\u000f\u001e\t\u0003svt!A_>\u0011\u0005!\u0003\u0013B\u0001?!\u0003\u0019\u0001&/\u001a3fM&\u0011AG \u0006\u0003y\u0002Bq!!\u0001s\u0001\u0004\t\u0019!A\u0004tKJ4XM]:\u0011\t\u0019s\u0015Q\u0001\t\u0004%\u0006\u001d\u0011bAA\u0005'\nY1*\u00194lCN+'O^3s\u0011\u0019\tiA\u001da\u0001=\u0005yQ.Z:tC\u001e,7\u000fU3s\u001d>$W\r\u0003\u0004\u0002\u0012I\u0004\r\u0001_\u0001\u0007Q\u0016\fG-\u001a:\t\u000f\u0005U\u0001\u0001\"\u0001\u0002\u0018\u0005Yq-\u001a;NKN\u001c\u0018mZ3t)\u0015)\u0018\u0011DA\u000f\u0011\u001d\tY\"a\u0005A\u0002y\t!C\\'fgN\fw-Z:QKJ$\u0006N]3bI\"A\u0011qDA\n\u0001\u0004\t\t#\u0001\u000bk)>\u0004\u0018nY'fgN\fw-Z*ue\u0016\fWn\u001d\t\u0007w\u0005\r\u00020a\n\n\u0007\u0005\u0015BHA\u0002NCB\u0004RaOA\u0015\u0003WI!a\u001e\u001f\u0011\r\u00055\u0012\u0011\u0007=y\u001b\t\tyC\u0003\u0002\u0004\r%!\u00111GA\u0018\u0005-Y\u0015MZ6b'R\u0014X-Y7\t\u000f\u0005]\u0002\u0001\"\u0003\u0002:\u0005IAo\u001c&bm\u0006l\u0015\r\u001d\u000b\u0005\u0003w\t\u0019\u0005\u0005\u0004<\u0003GA\u0018Q\b\t\u0004]\u0005}\u0012bAA!_\t9\u0011J\u001c;fO\u0016\u0014\b\u0002CA#\u0003k\u0001\r!a\u0012\u0002\u0011M\u001c\u0017\r\\1NCB\u0004R!_A%qzI1!!\n\u007fQ\u001d\u0001\u0011QJA*\u0003/\u00022aHA(\u0013\r\t\t\u0006\t\u0002\u000bI\u0016\u0004(/Z2bi\u0016$\u0017EAA+\u0003!#\u0006.[:!i\u0016\u001cH\u000f\t5bg\u0002\u0012W-\u001a8!I\u0016\u0004(/Z2bi\u0016$\u0007%\u00198eA%$\be^5mY\u0002\u0012W\r\t:f[>4X\r\u001a\u0011j]\u0002\n\u0007EZ;ukJ,\u0007E]3mK\u0006\u001cX-\t\u0002\u0002Z\u0005A\u0001GL\u00191]Ar\u0003\u0007")
public class ZookeeperConsumerConnectorTest
extends KafkaServerTestHarness {
    private final int numNodes;
    private final int numParts;
    private final String topic;
    private final Properties overridingProps = new Properties();
    private final String group;
    private final String consumer1;
    private final int nMessages;

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public String topic() {
        return this.topic;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(this.numNodes(), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13()).map((Function1 & Serializable & scala.Serializable)x$1 -> KafkaConfig$.MODULE$.fromProps(x$1, this.overridingProps()), Seq$.MODULE$.canBuildFrom());
    }

    public String group() {
        return this.group;
    }

    public String consumer1() {
        return this.consumer1;
    }

    public int nMessages() {
        return this.nMessages;
    }

    @Test
    public void testBasic() {
        Logger requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);
        requestHandlerLogger.setLevel(Level.FATAL);
        TestUtils$.MODULE$.createTopic(this.zkUtils(), this.topic(), this.numParts(), 1, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        List<String> sentMessages1 = this.sendMessages((Seq<KafkaServer>)this.servers(), this.nMessages(), "batch1");
        ConsumerConfig consumerConfig1 = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        ZookeeperConsumerConnector zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true);
        java.util.Map topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(this.toJavaMap((scala.collection.immutable.Map<String, Object>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)(this.numNodes() * this.numParts() / 2)))})))), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List<String> receivedMessages1 = this.getMessages(this.nMessages() * 2, topicMessageStreams1);
        Assert.assertEquals((Object)sentMessages1.sorted((Ordering)Ordering.String$.MODULE$), (Object)receivedMessages1.sorted((Ordering)Ordering.String$.MODULE$));
        try {
            zkConsumerConnector1.createMessageStreams(this.toJavaMap((scala.collection.immutable.Map<String, Object>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)(this.numNodes() * this.numParts() / 2)))})))), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
            throw this.fail("Should fail with MessageStreamsExistException", new Position("ZookeeperConsumerConnectorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 76));
        }
        catch (MessageStreamsExistException messageStreamsExistException) {
            zkConsumerConnector1.shutdown();
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "all consumer connectors stopped");
            requestHandlerLogger.setLevel(Level.ERROR);
            return;
        }
    }

    public List<String> sendMessages(Seq<KafkaServer> servers, int messagesPerNode, String header) {
        ObjectRef messages = ObjectRef.create((Object)Nil$.MODULE$);
        servers.foreach((Function1 & Serializable & scala.Serializable)server -> {
            ZookeeperConsumerConnectorTest.$anonfun$sendMessages$1(this, servers, messagesPerNode, header, messages, server);
            return BoxedUnit.UNIT;
        });
        return (List)messages.elem;
    }

    /*
     * WARNING - void declaration
     */
    public List<String> getMessages(int nMessagesPerThread, java.util.Map<String, java.util.List<KafkaStream<String, String>>> jTopicMessageStreams) {
        void var3_3;
        List<String> messages = Nil$.MODULE$;
        Map topicMessageStreams = JavaConversions$.MODULE$.deprecated$u0020mapAsScalaMap(jTopicMessageStreams).mapValues((Function1 & Serializable & scala.Serializable)x$3 -> JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(x$3).toList());
        messages = TestUtils$.MODULE$.getMessages((Map<String, List<KafkaStream<String, String>>>)topicMessageStreams, nMessagesPerThread);
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    private java.util.Map<String, Integer> toJavaMap(scala.collection.immutable.Map<String, Object> scalaMap) {
        void var2_2;
        HashMap javaMap = new HashMap();
        scalaMap.foreach((Function1 & Serializable & scala.Serializable)m -> javaMap.put(m._1(), BoxesRunTime.boxToInteger((int)m._2$mcI$sp())));
        return var2_2;
    }

    public static final /* synthetic */ String $anonfun$sendMessages$3(String header$1, KafkaServer server$1, int partition$1, int x) {
        return header$1 + server$1.config().brokerId() + "-" + partition$1 + "-" + x;
    }

    public static final /* synthetic */ void $anonfun$sendMessages$1(ZookeeperConsumerConnectorTest $this, Seq servers$1, int messagesPerNode$1, String header$1, ObjectRef messages$1, KafkaServer server) {
        kafka.producer.Producer producer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)servers$1, TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), StringEncoder.class.getName(), IntEncoder.class.getName(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5());
        Producer javaProducer = new Producer(producer);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), $this.numParts()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partition -> {
            IndexedSeq ms = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), messagesPerNode$1).map((Function1 & Serializable & scala.Serializable)x -> ZookeeperConsumerConnectorTest.$anonfun$sendMessages$3(header$1, server, partition, BoxesRunTime.unboxToInt((Object)x)), IndexedSeq$.MODULE$.canBuildFrom());
            messages$1.elem = (List)((List)messages$1.elem).$plus$plus((GenTraversableOnce)ms, List$.MODULE$.canBuildFrom());
            javaProducer.send(JavaConversions$.MODULE$.deprecated$u0020seqAsJavaList((Seq)ms.map((Function1 & Serializable & scala.Serializable)x$2 -> new KeyedMessage($this.topic(), (Object)BoxesRunTime.boxToInteger((int)partition), x$2), IndexedSeq$.MODULE$.canBuildFrom())));
        });
        javaProducer.close();
    }

    public ZookeeperConsumerConnectorTest() {
        this.numNodes = 2;
        this.numParts = 2;
        this.topic = "topic1";
        this.overridingProps().put(KafkaConfig$.MODULE$.NumPartitionsProp(), ((Object)BoxesRunTime.boxToInteger((int)this.numParts())).toString());
        this.group = "group1";
        this.consumer1 = "consumer1";
        this.nMessages = 2;
    }
}

