/*
 * Decompiled with CFR 0.152.
 */
package kafka.integration;

import java.io.Serializable;
import java.util.Properties;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.integration.KafkaServerTestHarness;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaRequestHandler;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils$;
import kafka.utils.ZKGroupTopicDirs;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005ub\u0001B\u0001\u0003\u0001\u001d\u00111#Q;u_>3gm]3u%\u0016\u001cX\r\u001e+fgRT!a\u0001\u0003\u0002\u0017%tG/Z4sCRLwN\u001c\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001\t\u0019A\u0011\u0011BC\u0007\u0002\u0005%\u00111B\u0001\u0002\u0017\u0017\u000647.Y*feZ,'\u000fV3ti\"\u000b'O\\3tgB\u0011Q\u0002E\u0007\u0002\u001d)\u0011q\u0002B\u0001\u0006kRLGn]\u0005\u0003#9\u0011q\u0001T8hO&tw\rC\u0003\u0014\u0001\u0011\u0005A#\u0001\u0004=S:LGO\u0010\u000b\u0002+A\u0011\u0011\u0002\u0001\u0005\u0006/\u0001!\t\u0001G\u0001\u0010O\u0016tWM]1uK\u000e{gNZ5hgV\t\u0011\u0004E\u0002\u001bC\rj\u0011a\u0007\u0006\u00039u\t\u0011\"[7nkR\f'\r\\3\u000b\u0005yy\u0012AC2pY2,7\r^5p]*\t\u0001%A\u0003tG\u0006d\u0017-\u0003\u0002#7\t!A*[:u!\t!s%D\u0001&\u0015\t1C!\u0001\u0004tKJ4XM]\u0005\u0003Q\u0015\u00121bS1gW\u0006\u001cuN\u001c4jO\"9!\u0006\u0001b\u0001\n\u0003Y\u0013!\u0002;pa&\u001cW#\u0001\u0017\u0011\u00055\u0012T\"\u0001\u0018\u000b\u0005=\u0002\u0014\u0001\u00027b]\u001eT\u0011!M\u0001\u0005U\u00064\u0018-\u0003\u00024]\t11\u000b\u001e:j]\u001eDa!\u000e\u0001!\u0002\u0013a\u0013A\u0002;pa&\u001c\u0007\u0005C\u00048\u0001\t\u0007I\u0011A\u0016\u0002\u000b\u001d\u0014x.\u001e9\t\re\u0002\u0001\u0015!\u0003-\u0003\u00199'o\\;qA!91\b\u0001b\u0001\n\u0003Y\u0013\u0001\u0004;fgR\u001cuN\\:v[\u0016\u0014\bBB\u001f\u0001A\u0003%A&A\u0007uKN$8i\u001c8tk6,'\u000f\t\u0005\b\u007f\u0001\u0011\r\u0011\"\u0001A\u0003-qU/\\'fgN\fw-Z:\u0016\u0003\u0005\u0003\"AQ\"\u000e\u0003}I!\u0001R\u0010\u0003\u0007%sG\u000f\u0003\u0004G\u0001\u0001\u0006I!Q\u0001\r\u001dVlW*Z:tC\u001e,7\u000f\t\u0005\b\u0011\u0002\u0011\r\u0011\"\u0001A\u0003-a\u0015M]4f\u001f\u001a47/\u001a;\t\r)\u0003\u0001\u0015!\u0003B\u00031a\u0015M]4f\u001f\u001a47/\u001a;!\u0011\u001da\u0005A1A\u0005\u0002\u0001\u000b1bU7bY2|eMZ:fi\"1a\n\u0001Q\u0001\n\u0005\u000bAbU7bY2|eMZ:fi\u0002Bq\u0001\u0015\u0001C\u0002\u0013\u0005\u0011+\u0001\u000bsKF,Xm\u001d;IC:$G.\u001a:M_\u001e<WM]\u000b\u0002%B\u00111KW\u0007\u0002)*\u0011QKV\u0001\u0006Y><GG\u001b\u0006\u0003/b\u000ba!\u00199bG\",'\"A-\u0002\u0007=\u0014x-\u0003\u0002\\)\n1Aj\\4hKJDa!\u0018\u0001!\u0002\u0013\u0011\u0016!\u0006:fcV,7\u000f\u001e%b]\u0012dWM\u001d'pO\u001e,'\u000f\t\u0005\u0006?\u0002!\t\u0005Y\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002CB\u0011!IY\u0005\u0003G~\u0011A!\u00168ji\"\u0012a,\u001a\t\u0003M&l\u0011a\u001a\u0006\u0003Qb\u000bQA[;oSRL!A[4\u0003\r\t+gm\u001c:f\u0011\u0015a\u0007\u0001\"\u0011a\u0003!!X-\u0019:E_^t\u0007FA6o!\t1w.\u0003\u0002qO\n)\u0011I\u001a;fe\")!\u000f\u0001C\u0001A\u0006!C/Z:u%\u0016\u001cX\r\u001e+p\u000b\u0006\u0014H.[3ti^CWM\\(gMN,G\u000fV8p\u0011&<\u0007\u000e\u000b\u0002riB\u0011a-^\u0005\u0003m\u001e\u0014A\u0001V3ti\")\u0001\u0010\u0001C\u0001A\u0006\u0019C/Z:u%\u0016\u001cX\r\u001e+p\u000b\u0006\u0014H.[3ti^CWM\\(gMN,G\u000fV8p\u0019><\bFA<u\u0011\u0015Y\b\u0001\"\u0001a\u0003\t\"Xm\u001d;SKN,G\u000fV8MCR,7\u000f^,iK:|eMZ:fiR{w\u000eS5hQ\"\u0012!\u0010\u001e\u0005\u0006}\u0002!\t\u0001Y\u0001\"i\u0016\u001cHOU3tKR$v\u000eT1uKN$x\u000b[3o\u001f\u001a47/\u001a;U_>dun\u001e\u0015\u0003{RDq!a\u0001\u0001\t\u0003\t)!A\bsKN,G/\u00118e\u0007>t7/^7f)\u001d\t\u0015qAA\u0006\u0003GAq!!\u0003\u0002\u0002\u0001\u0007\u0011)A\u0006ok6lUm]:bO\u0016\u001c\b\u0002CA\u0007\u0003\u0003\u0001\r!a\u0004\u0002\u000fI,7/\u001a;U_B!\u0011\u0011CA\u0010\u001d\u0011\t\u0019\"a\u0007\u0011\u0007\u0005Uq$\u0004\u0002\u0002\u0018)\u0019\u0011\u0011\u0004\u0004\u0002\rq\u0012xn\u001c;?\u0013\r\tibH\u0001\u0007!J,G-\u001a4\n\u0007M\n\tCC\u0002\u0002\u001e}A\u0001\"!\n\u0002\u0002\u0001\u0007\u0011qE\u0001\u0007_\u001a47/\u001a;\u0011\u0007\t\u000bI#C\u0002\u0002,}\u0011A\u0001T8oO\":\u0001!a\f\u00026\u0005e\u0002c\u0001\"\u00022%\u0019\u00111G\u0010\u0003\u0015\u0011,\u0007O]3dCR,G-\t\u0002\u00028\u0005AE\u000b[5tAQ,7\u000f\u001e\u0011iCN\u0004#-Z3oA\u0011,\u0007O]3dCR,G\rI1oI\u0002JG\u000fI<jY2\u0004#-\u001a\u0011sK6|g/\u001a3!S:\u0004\u0013\r\t4viV\u0014X\r\t:fY\u0016\f7/Z\u0011\u0003\u0003w\t\u0001\u0002\r\u00182a9\u0002d\u0006\r")
public class AutoOffsetResetTest
extends KafkaServerTestHarness {
    private final String topic;
    private final String group;
    private final String testConsumer;
    private final int NumMessages;
    private final int LargeOffset;
    private final int SmallOffset;
    private final Logger requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);

    public List<KafkaConfig> generateConfigs() {
        return List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(0, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17()))}));
    }

    public String topic() {
        return this.topic;
    }

    public String group() {
        return this.group;
    }

    public String testConsumer() {
        return this.testConsumer;
    }

    public int NumMessages() {
        return this.NumMessages;
    }

    public int LargeOffset() {
        return this.LargeOffset;
    }

    public int SmallOffset() {
        return this.SmallOffset;
    }

    public Logger requestHandlerLogger() {
        return this.requestHandlerLogger;
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        this.requestHandlerLogger().setLevel(Level.FATAL);
    }

    @Override
    @After
    public void tearDown() {
        this.requestHandlerLogger().setLevel(Level.ERROR);
        super.tearDown();
    }

    @Test
    public void testResetToEarliestWhenOffsetTooHigh() {
        Assert.assertEquals((long)this.NumMessages(), (long)this.resetAndConsume(this.NumMessages(), "smallest", this.LargeOffset()));
    }

    @Test
    public void testResetToEarliestWhenOffsetTooLow() {
        Assert.assertEquals((long)this.NumMessages(), (long)this.resetAndConsume(this.NumMessages(), "smallest", this.SmallOffset()));
    }

    @Test
    public void testResetToLatestWhenOffsetTooHigh() {
        Assert.assertEquals((long)0L, (long)this.resetAndConsume(this.NumMessages(), "largest", this.LargeOffset()));
    }

    @Test
    public void testResetToLatestWhenOffsetTooLow() {
        Assert.assertEquals((long)0L, (long)this.resetAndConsume(this.NumMessages(), "largest", this.SmallOffset()));
    }

    public int resetAndConsume(int numMessages, String resetTo, long offset) {
        TestUtils$.MODULE$.createTopic(this.zkUtils(), this.topic(), 1, 1, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        String x$1 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        String x$2 = StringEncoder.class.getName();
        String x$3 = TestUtils$.MODULE$.createProducer$default$2();
        String x$4 = TestUtils$.MODULE$.createProducer$default$4();
        Properties x$5 = TestUtils$.MODULE$.createProducer$default$5();
        Producer producer = TestUtils$.MODULE$.createProducer(x$1, x$3, x$2, x$4, x$5);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numMessages).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_ -> producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(this.topic(), (Object)this.topic(), (Object)"test".getBytes())})));
        ZKGroupTopicDirs dirs = new ZKGroupTopicDirs(this.group(), this.topic());
        Properties consumerProps = TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.testConsumer(), TestUtils$.MODULE$.createConsumerProperties$default$4());
        consumerProps.put("auto.offset.reset", resetTo);
        consumerProps.put("consumer.timeout.ms", "2000");
        consumerProps.put("fetch.wait.max.ms", "0");
        ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
        TestUtils$.MODULE$.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir() + "/" + "0", offset);
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Updated consumer offset to " + offset);
        ConsumerConnector consumerConnector = Consumer$.MODULE$.create(consumerConfig);
        KafkaStream messageStream = (KafkaStream)((IterableLike)consumerConnector.createMessageStreams((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topic()), (Object)BoxesRunTime.boxToInteger((int)1))}))).apply((Object)this.topic())).head();
        IntRef received = IntRef.create((int)0);
        ConsumerIterator iter = messageStream.iterator();
        try {
            try {
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numMessages).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_ -> {
                    iter.next();
                    ++received$1.elem;
                });
            }
            catch (ConsumerTimeoutException consumerTimeoutException) {
                this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "consumer timed out after receiving " + received$1.elem + " messages.");
            }
        }
        finally {
            producer.close();
            consumerConnector.shutdown();
        }
        return received.elem;
    }

    public AutoOffsetResetTest() {
        this.topic = "test_topic";
        this.group = "default_group";
        this.testConsumer = "consumer";
        this.NumMessages = 10;
        this.LargeOffset = 10000;
        this.SmallOffset = -1;
    }
}

