/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.util.Optional;
import java.util.Properties;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.BaseRequestTest;
import kafka.server.KafkaServer;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.junit.Assert;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001E3A!\u0001\u0002\u0001\u000f\t1B*[:u\u001f\u001a47/\u001a;t%\u0016\fX/Z:u)\u0016\u001cHO\u0003\u0002\u0004\t\u000511/\u001a:wKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0001\u0002\u0005\u0002\n\u00155\t!!\u0003\u0002\f\u0005\ty!)Y:f%\u0016\fX/Z:u)\u0016\u001cH\u000fC\u0003\u000e\u0001\u0011\u0005a\"\u0001\u0004=S:LGO\u0010\u000b\u0002\u001fA\u0011\u0011\u0002\u0001\u0005\u0006#\u0001!\tAE\u0001,i\u0016\u001cH\u000fT5ti>3gm]3ugJ+\u0017/^3ti\u001a{'OT8o)&,'/\u001a3QCJ$\u0018\u000e^5p]R\t1\u0003\u0005\u0002\u0015/5\tQCC\u0001\u0017\u0003\u0015\u00198-\u00197b\u0013\tARC\u0001\u0003V]&$\bF\u0001\t\u001b!\tY\u0002%D\u0001\u001d\u0015\tib$A\u0003kk:LGOC\u0001 \u0003\ry'oZ\u0005\u0003Cq\u0011A\u0001V3ti\")1\u0005\u0001C\u0001%\u0005IB/Z:u\u0019&\u001cHo\u00144gg\u0016$8/\u0012:s_J\u001cu\u000eZ3tQ\t\u0011#\u0004C\u0003'\u0001\u0011\u0005!#\u0001\u000euKN$8)\u001e:sK:$X\t]8dQZ\u000bG.\u001b3bi&|g\u000e\u000b\u0002&5!)\u0011\u0006\u0001C\u0001%\u0005yB/Z:u%\u0016\u001c\bo\u001c8tK&s7\r\\;eKNdU-\u00193fe\u0016\u0003xn\u00195)\u0005!R\u0002\"\u0002\u0017\u0001\t\u0013i\u0013aE1tg\u0016\u0014HOU3ta>t7/Z#se>\u0014H\u0003B\n/w\u0001CQaL\u0016A\u0002A\nQ!\u001a:s_J\u0004\"!M\u001d\u000e\u0003IR!a\r\u001b\u0002\u0011A\u0014x\u000e^8d_2T!!\u000e\u001c\u0002\r\r|W.\\8o\u0015\t)qG\u0003\u00029=\u00051\u0011\r]1dQ\u0016L!A\u000f\u001a\u0003\r\u0015\u0013(o\u001c:t\u0011\u0015a4\u00061\u0001>\u0003!\u0011'o\\6fe&#\u0007C\u0001\u000b?\u0013\tyTCA\u0002J]RDQ!Q\u0016A\u0002\t\u000bqA]3rk\u0016\u001cH\u000f\u0005\u0002D\r6\tAI\u0003\u0002Fi\u0005A!/Z9vKN$8/\u0003\u0002H\t\n\tB*[:u\u001f\u001a47/\u001a;SKF,Xm\u001d;\t\u000b%\u0003A\u0011\u0002&\u0002\u0017M,g\u000e\u001a*fcV,7\u000f\u001e\u000b\u0004\u0017:\u0003\u0006CA\"M\u0013\tiEI\u0001\nMSN$xJ\u001a4tKR\u0014Vm\u001d9p]N,\u0007\"B(I\u0001\u0004i\u0014\u0001\u00037fC\u0012,'/\u00133\t\u000b\u0005C\u0005\u0019\u0001\"")
public class ListOffsetsRequestTest
extends BaseRequestTest {
    @Test
    public void testListOffsetsRequestForNonTieredPartition() {
        String topicName = "test-topic";
        Properties props = new Properties();
        props.put("segment.bytes", "16384");
        props.put("retention.bytes", "-1");
        scala.collection.immutable.Map<Object, Object> partitionToLeaderMap = this.createTopic(topicName, 1, 1, props);
        int numMessages = 3000;
        TestUtils$.MODULE$.generateAndProduceMessages((Seq<KafkaServer>)this.servers().toSeq(), topicName, numMessages, TestUtils$.MODULE$.generateAndProduceMessages$default$4());
        TopicPartition topicPartition = new TopicPartition(topicName, 0);
        int leaderId = BoxesRunTime.unboxToInt((Object)partitionToLeaderMap.apply((Object)BoxesRunTime.boxToInteger((int)topicPartition.partition())));
        KafkaServer server = (KafkaServer)this.serverForId(leaderId).get();
        LogManager qual$1 = server.logManager();
        TopicPartition x$4 = topicPartition;
        boolean x$5 = qual$1.getLog$default$2();
        AbstractLog log2 = (AbstractLog)qual$1.getLog(x$4, x$5).get();
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, numMessages, log2){
            public static final long serialVersionUID = 0L;
            private final int numMessages$1;
            private final AbstractLog log$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return this.log$1.logEndOffset() == (long)this.numMessages$1;
            }
            {
                this.numMessages$1 = numMessages$1;
                this.log$1 = log$1;
            }
        }, (Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Timeout waiting for all records to be produced";
            }
        }, 180000L, TestUtils$.MODULE$.waitUntilTrue$default$4());
        this.makeListOffsetsRequestAndValidateResponse(topicPartition, -1L, leaderId, log2, (short)0);
        log2.maybeIncrementHighWatermark(new LogOffsetMetadata(log2.logStartOffset() + 100L, LogOffsetMetadata$.MODULE$.apply$default$2(), LogOffsetMetadata$.MODULE$.apply$default$3()));
        log2.maybeIncrementLogStartOffset(log2.logStartOffset() + 100L);
        this.makeListOffsetsRequestAndValidateResponse(topicPartition, -1L, leaderId, log2, (short)0);
        this.makeListOffsetsRequestAndValidateResponse(topicPartition, -2L, leaderId, log2, (short)0);
    }

    @Test
    public void testListOffsetsErrorCodes() {
        String topic = "topic";
        TopicPartition partition = new TopicPartition(topic, 0);
        java.util.Map targetTimes = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)partition), (Object)new ListOffsetRequest.PartitionData(-2L, Optional.of(Predef$.MODULE$.int2Integer(0))))}))).asJava();
        ListOffsetRequest consumerRequest = (ListOffsetRequest)ListOffsetRequest.Builder.forConsumer((boolean)false, (IsolationLevel)IsolationLevel.READ_UNCOMMITTED).setTargetTimes(targetTimes).build();
        ListOffsetRequest replicaRequest = (ListOffsetRequest)ListOffsetRequest.Builder.forReplica((short)ApiKeys.LIST_OFFSETS.latestVersion(), (int)((KafkaServer)this.servers().head()).config().brokerId()).setTargetTimes(targetTimes).build();
        ListOffsetRequest debugReplicaRequest = (ListOffsetRequest)ListOffsetRequest.Builder.forReplica((short)ApiKeys.LIST_OFFSETS.latestVersion(), (int)-2).setTargetTimes(targetTimes).build();
        int randomBrokerId = ((KafkaServer)this.servers().head()).config().brokerId();
        this.assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, consumerRequest);
        this.assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, replicaRequest);
        this.assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, debugReplicaRequest);
        scala.collection.immutable.Map<Object, Object> partitionToLeader = TestUtils$.MODULE$.createTopic(this.zkClient(), topic, 1, 2, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        Set replicas = this.zkClient().getReplicasForPartition(partition).toSet();
        int leader2 = BoxesRunTime.unboxToInt((Object)partitionToLeader.apply((Object)BoxesRunTime.boxToInteger((int)partition.partition())));
        int follower2 = BoxesRunTime.unboxToInt((Object)replicas.find((Function1)new Serializable(this, leader2){
            public static final long serialVersionUID = 0L;
            private final int leader$1;

            public final boolean apply(int x$1) {
                return this.apply$mcZI$sp(x$1);
            }

            public boolean apply$mcZI$sp(int x$1) {
                return x$1 != this.leader$1;
            }
            {
                this.leader$1 = leader$1;
            }
        }).get());
        int nonReplica = BoxesRunTime.unboxToInt((Object)((IterableLike)this.servers().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final int apply(KafkaServer x$2) {
                return x$2.config().brokerId();
            }
        }, Buffer$.MODULE$.canBuildFrom())).find((Function1)new Serializable(this, replicas){
            public static final long serialVersionUID = 0L;
            private final Set replicas$1;

            public final boolean apply(int x$3) {
                return this.apply$mcZI$sp(x$3);
            }

            public boolean apply$mcZI$sp(int x$3) {
                return !this.replicas$1.contains((Object)BoxesRunTime.boxToInteger((int)x$3));
            }
            {
                this.replicas$1 = replicas$1;
            }
        }).get());
        this.assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower2, consumerRequest);
        this.assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower2, replicaRequest);
        this.assertResponseError(Errors.NONE, follower2, debugReplicaRequest);
        this.assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, consumerRequest);
        this.assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, replicaRequest);
        this.assertResponseError(Errors.REPLICA_NOT_AVAILABLE, nonReplica, debugReplicaRequest);
    }

    @Test
    public void testCurrentEpochValidation() {
        String topic = "topic";
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        scala.collection.immutable.Map<Object, Object> partitionToLeader = TestUtils$.MODULE$.createTopic(this.zkClient(), topic, 1, 3, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        int firstLeaderId = BoxesRunTime.unboxToInt((Object)partitionToLeader.apply((Object)BoxesRunTime.boxToInteger((int)topicPartition.partition())));
        this.killBroker(firstLeaderId);
        int secondLeaderId = TestUtils$.MODULE$.awaitLeaderChange((Seq<KafkaServer>)this.servers(), topicPartition, firstLeaderId, TestUtils$.MODULE$.awaitLeaderChange$default$4());
        int secondLeaderEpoch = TestUtils$.MODULE$.findLeaderEpoch(secondLeaderId, topicPartition, (Iterable<KafkaServer>)this.servers());
        this.assertResponseErrorForEpoch$1(Errors.NONE, secondLeaderId, Optional.empty(), topicPartition);
        this.assertResponseErrorForEpoch$1(Errors.NONE, secondLeaderId, Optional.of(Predef$.MODULE$.int2Integer(secondLeaderEpoch)), topicPartition);
        this.assertResponseErrorForEpoch$1(Errors.FENCED_LEADER_EPOCH, secondLeaderId, Optional.of(Predef$.MODULE$.int2Integer(secondLeaderEpoch - 1)), topicPartition);
        this.assertResponseErrorForEpoch$1(Errors.UNKNOWN_LEADER_EPOCH, secondLeaderId, Optional.of(Predef$.MODULE$.int2Integer(secondLeaderEpoch + 1)), topicPartition);
        int followerId = TestUtils$.MODULE$.findFollowerId(topicPartition, (Iterable<KafkaServer>)this.servers());
        this.assertResponseErrorForEpoch$1(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.empty(), topicPartition);
        this.assertResponseErrorForEpoch$1(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.of(Predef$.MODULE$.int2Integer(secondLeaderEpoch)), topicPartition);
        this.assertResponseErrorForEpoch$1(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(Predef$.MODULE$.int2Integer(secondLeaderEpoch + 1)), topicPartition);
        this.assertResponseErrorForEpoch$1(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(Predef$.MODULE$.int2Integer(secondLeaderEpoch - 1)), topicPartition);
    }

    @Test
    public void testResponseIncludesLeaderEpoch() {
        String topic = "topic";
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        scala.collection.immutable.Map<Object, Object> partitionToLeader = TestUtils$.MODULE$.createTopic(this.zkClient(), topic, 1, 3, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        int firstLeaderId = BoxesRunTime.unboxToInt((Object)partitionToLeader.apply((Object)BoxesRunTime.boxToInteger((int)topicPartition.partition())));
        TestUtils$.MODULE$.generateAndProduceMessages((Seq<KafkaServer>)this.servers(), topic, 10, TestUtils$.MODULE$.generateAndProduceMessages$default$4());
        Assert.assertEquals((Object)new Tuple2((Object)BoxesRunTime.boxToLong((long)0L), Optional.of(BoxesRunTime.boxToInteger((int)0))), (Object)this.fetchOffsetAndEpoch$1(firstLeaderId, 0L, topicPartition));
        Assert.assertEquals((Object)new Tuple2((Object)BoxesRunTime.boxToLong((long)0L), Optional.of(BoxesRunTime.boxToInteger((int)0))), (Object)this.fetchOffsetAndEpoch$1(firstLeaderId, -2L, topicPartition));
        Assert.assertEquals((Object)new Tuple2((Object)BoxesRunTime.boxToLong((long)10L), Optional.of(BoxesRunTime.boxToInteger((int)0))), (Object)this.fetchOffsetAndEpoch$1(firstLeaderId, -1L, topicPartition));
        Assert.assertEquals((Object)new Tuple2((Object)BoxesRunTime.boxToLong((long)0L), Optional.empty()), (Object)this.fetchOffsetAndEpoch$1(firstLeaderId, -10000L, topicPartition));
        Assert.assertEquals((Object)new Tuple2((Object)BoxesRunTime.boxToLong((long)10L), Optional.empty()), (Object)this.fetchOffsetAndEpoch$1(firstLeaderId, -10001L, topicPartition));
        this.killBroker(firstLeaderId);
        int secondLeaderId = TestUtils$.MODULE$.awaitLeaderChange((Seq<KafkaServer>)this.servers(), topicPartition, firstLeaderId, TestUtils$.MODULE$.awaitLeaderChange$default$4());
        int secondLeaderEpoch = TestUtils$.MODULE$.findLeaderEpoch(secondLeaderId, topicPartition, (Iterable<KafkaServer>)this.servers());
        Assert.assertEquals((Object)new Tuple2((Object)BoxesRunTime.boxToLong((long)0L), Optional.of(BoxesRunTime.boxToInteger((int)0))), (Object)this.fetchOffsetAndEpoch$1(secondLeaderId, 0L, topicPartition));
        Assert.assertEquals((Object)new Tuple2((Object)BoxesRunTime.boxToLong((long)0L), Optional.of(BoxesRunTime.boxToInteger((int)0))), (Object)this.fetchOffsetAndEpoch$1(secondLeaderId, -2L, topicPartition));
        Assert.assertEquals((Object)new Tuple2((Object)BoxesRunTime.boxToLong((long)10L), Optional.of(BoxesRunTime.boxToInteger((int)secondLeaderEpoch))), (Object)this.fetchOffsetAndEpoch$1(secondLeaderId, -1L, topicPartition));
    }

    private void assertResponseError(Errors error, int brokerId, ListOffsetRequest request) {
        ListOffsetResponse response = this.sendRequest(brokerId, request);
        Assert.assertEquals((long)request.partitionTimestamps().size(), (long)response.responseData().size());
        ((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(response.responseData()).asScala()).values().foreach((Function1)new Serializable(this, error){
            public static final long serialVersionUID = 0L;
            private final Errors error$1;

            public final void apply(ListOffsetResponse.PartitionData partitionData) {
                Assert.assertEquals((Object)this.error$1, (Object)partitionData.error);
            }
            {
                this.error$1 = error$1;
            }
        });
    }

    private ListOffsetResponse sendRequest(int leaderId, ListOffsetRequest request) {
        return (ListOffsetResponse)this.connectAndReceive((AbstractRequest)request, this.brokerSocketServer(leaderId), this.connectAndReceive$default$3(), ClassTag$.MODULE$.apply(ListOffsetResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef.$eq$colon$eq$.MODULE$.tpEquals()));
    }

    private final void assertResponseErrorForEpoch$1(Errors error, int brokerId, Optional currentLeaderEpoch, TopicPartition topicPartition$1) {
        java.util.Map targetTimes = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition$1), (Object)new ListOffsetRequest.PartitionData(-2L, currentLeaderEpoch))}))).asJava();
        ListOffsetRequest request = (ListOffsetRequest)ListOffsetRequest.Builder.forConsumer((boolean)false, (IsolationLevel)IsolationLevel.READ_UNCOMMITTED).setTargetTimes(targetTimes).build();
        this.assertResponseError(error, brokerId, request);
    }

    private final Tuple2 fetchOffsetAndEpoch$1(int serverId, long timestamp, TopicPartition topicPartition$2) {
        java.util.Map targetTimes = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition$2), (Object)new ListOffsetRequest.PartitionData(timestamp, Optional.empty()))}))).asJava();
        ListOffsetRequest request = (ListOffsetRequest)ListOffsetRequest.Builder.forConsumer((boolean)false, (IsolationLevel)IsolationLevel.READ_UNCOMMITTED).setTargetTimes(targetTimes).build();
        ListOffsetResponse response = this.sendRequest(serverId, request);
        ListOffsetResponse.PartitionData partitionData = (ListOffsetResponse.PartitionData)response.responseData().get(topicPartition$2);
        Optional epochOpt = partitionData.leaderEpoch;
        return new Tuple2((Object)BoxesRunTime.boxToLong((long)Predef$.MODULE$.Long2long(partitionData.offset)), (Object)epochOpt);
    }
}

