/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.util.Arrays;
import java.util.Properties;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.ScalaObject;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxesRunTime;
import scala.runtime.DoubleRef;

public final class TestEndToEndLatency$
implements ScalaObject {
    public static final TestEndToEndLatency$ MODULE$;

    static {
        new TestEndToEndLatency$();
    }

    public void main(String[] args) {
        if (args.length != 6) {
            System.err.println(new StringBuilder().append((Object)"USAGE: java ").append((Object)this.getClass().getName()).append((Object)" broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks").toString());
            System.exit(1);
        }
        String brokerList = args[0];
        String zkConnect = args[1];
        String topic$1 = args[2];
        int numMessages = Predef$.MODULE$.augmentString(args[3]).toInt();
        int consumerFetchMaxWait = Predef$.MODULE$.augmentString(args[4]).toInt();
        int producerAcks = Predef$.MODULE$.augmentString(args[5]).toInt();
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", topic$1);
        consumerProps.put("auto.commit.enable", "false");
        consumerProps.put("auto.offset.reset", "largest");
        consumerProps.put("zookeeper.connect", zkConnect);
        consumerProps.put("fetch.wait.max.ms", ((Object)BoxesRunTime.boxToInteger((int)consumerFetchMaxWait)).toString());
        consumerProps.put("socket.timeout.ms", ((Object)BoxesRunTime.boxToInteger((int)1201000)).toString());
        ConsumerConfig config = new ConsumerConfig(consumerProps);
        ConsumerConnector connector = Consumer$.MODULE$.create(config);
        KafkaStream stream = (KafkaStream)((IterableLike)Option$.MODULE$.option2Iterable(connector.createMessageStreams((Map<String, Object>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc((Object)topic$1).$minus$greater((Object)BoxesRunTime.boxToInteger((int)1))})))).get((Object)topic$1)).head()).head();
        Iterator iter$1 = stream.iterator();
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", brokerList);
        producerProps.put("linger.ms", "0");
        producerProps.put("block.on.buffer.full", "true");
        producerProps.put("acks", ((Object)BoxesRunTime.boxToInteger((int)producerAcks)).toString());
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer producer$1 = new KafkaProducer(producerProps);
        Thread.sleep(5000L);
        byte[] message$1 = "hello there beautiful".getBytes();
        DoubleRef totalTime$1 = new DoubleRef(0.0);
        long[] latencies$1 = new long[numMessages];
        Predef$.MODULE$.intWrapper(0).until(numMessages).foreach$mVc$sp((Function1)new Serializable(topic$1, (ConsumerIterator)iter$1, producer$1, message$1, totalTime$1, latencies$1){
            public static final long serialVersionUID;
            private final String topic$1;
            private final ConsumerIterator iter$1;
            private final KafkaProducer producer$1;
            private final byte[] message$1;
            private final DoubleRef totalTime$1;
            private final long[] latencies$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final void apply(int i) {
                this.apply$mcVI$sp(i);
            }

            public void apply$mcVI$sp(int v1) {
                long begin = System.nanoTime();
                this.producer$1.send(new ProducerRecord(this.topic$1, (Object)this.message$1));
                Object received = this.iter$1.next();
                long elapsed = System.nanoTime() - begin;
                if (v1 % 1000 == 0) {
                    Predef$.MODULE$.println((Object)new StringBuilder().append(v1).append((Object)"\t").append((Object)BoxesRunTime.boxToDouble((double)((double)elapsed / 1000.0 / 1000.0))).toString());
                }
                this.totalTime$1.elem += (double)elapsed;
                this.latencies$1[v1] = elapsed / 1000L / 1000L;
            }
            {
                this.topic$1 = string;
                this.iter$1 = consumerIterator;
                this.producer$1 = kafkaProducer;
                this.message$1 = byArray;
                this.totalTime$1 = doubleRef;
                this.latencies$1 = lArray;
            }
        });
        Predef$.MODULE$.println((Object)Predef$.MODULE$.augmentString("Avg latency: %.4f ms\n").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToDouble((double)(totalTime$1.elem / (double)numMessages / 1000.0 / 1000.0))})));
        Arrays.sort(latencies$1);
        long p50 = latencies$1[(int)((double)latencies$1.length * 0.5)];
        long p99 = latencies$1[(int)((double)latencies$1.length * 0.99)];
        long p999 = latencies$1[(int)((double)latencies$1.length * 0.999)];
        Predef$.MODULE$.println((Object)Predef$.MODULE$.augmentString("Percentiles: 50th = %d, 99th = %d, 99.9th = %d").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)p50), BoxesRunTime.boxToLong((long)p99), BoxesRunTime.boxToLong((long)p999)})));
        producer$1.close();
        connector.commitOffsets(true);
        connector.shutdown();
        System.exit(0);
    }

    private TestEndToEndLatency$() {
        MODULE$ = this;
    }
}

