/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.Optional;
import kafka.cluster.Partition;
import kafka.cluster.Replica;
import kafka.log.LogOffsetSnapshot;
import kafka.server.DelayedFetch;
import kafka.server.FetchDataInfo;
import kafka.server.FetchDataInfo$;
import kafka.server.FetchIsolation;
import kafka.server.FetchLogEnd$;
import kafka.server.FetchMetadata;
import kafka.server.FetchPartitionData;
import kafka.server.FetchPartitionStatus;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.server.LogReadResult;
import kafka.server.LogReadResult$;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Test;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

@ScalaSignature(bytes="\u0006\u0001\u0005%b\u0001\u0002\t\u0012\u0001YAQa\b\u0001\u0005\u0002\u0001Bqa\t\u0001C\u0002\u0013%A\u0005\u0003\u0004,\u0001\u0001\u0006I!\n\u0005\bY\u0001\u0011\r\u0011\"\u0003.\u0011\u0019\t\u0004\u0001)A\u0005]!9!\u0007\u0001b\u0001\n\u0013\u0019\u0004BB\u001c\u0001A\u0003%A\u0007C\u00039\u0001\u0011\u0005\u0011\bC\u0003E\u0001\u0011\u0005\u0011\bC\u0003G\u0001\u0011\u0005q\tC\u0003Y\u0001\u0011\u0005\u0011\bC\u0003[\u0001\u0011%1\fC\u0003r\u0001\u0011%!\u000fC\u0004\u0002\u0014\u0001!I!!\u0006\t\u000f\u0005u\u0001\u0001\"\u0003\u0002 \t\u0001B)\u001a7bs\u0016$g)\u001a;dQR+7\u000f\u001e\u0006\u0003%M\taa]3sm\u0016\u0014(\"\u0001\u000b\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001a\u0006\t\u00031ui\u0011!\u0007\u0006\u00035m\t\u0001\"Z1ts6|7m\u001b\u0006\u00029\u0005\u0019qN]4\n\u0005yI\"aD#bgflunY6TkB\u0004xN\u001d;\u0002\rqJg.\u001b;?)\u0005\t\u0003C\u0001\u0012\u0001\u001b\u0005\t\u0012\u0001C7bq\nKH/Z:\u0016\u0003\u0015\u0002\"AJ\u0015\u000e\u0003\u001dR\u0011\u0001K\u0001\u0006g\u000e\fG.Y\u0005\u0003U\u001d\u00121!\u00138u\u0003%i\u0017\r\u001f\"zi\u0016\u001c\b%\u0001\bsKBd\u0017nY1NC:\fw-\u001a:\u0016\u00039\u0002\"AI\u0018\n\u0005A\n\"A\u0004*fa2L7-Y'b]\u0006<WM]\u0001\u0010e\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3sA\u0005a!/\u001a9mS\u000e\f\u0017+^8uCV\tA\u0007\u0005\u0002#k%\u0011a'\u0005\u0002\r%\u0016\u0004H.[2b#V|G/Y\u0001\u000ee\u0016\u0004H.[2b#V|G/\u0019\u0011\u00021Q,7\u000f\u001e$fi\u000eDw+\u001b;i\r\u0016t7-\u001a3Fa>\u001c\u0007\u000eF\u0001;!\t13(\u0003\u0002=O\t!QK\\5uQ\tAa\b\u0005\u0002@\u00056\t\u0001I\u0003\u0002B7\u0005)!.\u001e8ji&\u00111\t\u0011\u0002\u0005)\u0016\u001cH/A\fuKN$(+\u001a9mS\u000e\fgj\u001c;Bm\u0006LG.\u00192mK\"\u0012\u0011BP\u0001#G\",7m[\"p[BdW\r^3XQ\u0016tgi\u001c7m_^,'\u000fT1hO&tw\rS,\u0015\u0007iB\u0005\u000bC\u0003J\u0015\u0001\u0007!*\u0001\u0006g_2dwn^3s\u0011^\u00032AJ&N\u0013\tauE\u0001\u0004PaRLwN\u001c\t\u0003M9K!aT\u0014\u0003\t1{gn\u001a\u0005\u0006#*\u0001\rAU\u0001\fG\",7m\u001b*fgVdG\u000f\u0005\u0003''VS\u0014B\u0001+(\u0005%1UO\\2uS>t\u0017\u0007\u0005\u0002#-&\u0011q+\u0005\u0002\r\t\u0016d\u0017-_3e\r\u0016$8\r[\u0001\"i\u0016\u001cHoQ8na2,G/Z,iK:4u\u000e\u001c7po\u0016\u0014H*Y4hS:<\u0007j\u0016\u0015\u0003\u0017y\n!CY;jY\u00124U\r^2i\u001b\u0016$\u0018\rZ1uCR!AlX1m!\t\u0011S,\u0003\u0002_#\tia)\u001a;dQ6+G/\u00193bi\u0006DQ\u0001\u0019\u0007A\u0002\u0015\n\u0011B]3qY&\u001c\u0017-\u00133\t\u000b\td\u0001\u0019A2\u0002\u001dQ|\u0007/[2QCJ$\u0018\u000e^5p]B\u0011AM[\u0007\u0002K*\u0011amZ\u0001\u0007G>lWn\u001c8\u000b\u0005QA'BA5\u001c\u0003\u0019\t\u0007/Y2iK&\u00111.\u001a\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0011\u0015iG\u00021\u0001o\u0003-1W\r^2i'R\fG/^:\u0011\u0005\tz\u0017B\u00019\u0012\u0005Q1U\r^2i!\u0006\u0014H/\u001b;j_:\u001cF/\u0019;vg\u0006qR\r\u001f9fGR\u0014V-\u00193Ge>l'+\u001a9mS\u000e\fw+\u001b;i\u000bJ\u0014xN\u001d\u000b\u0007uM$X/a\u0001\t\u000b\u0001l\u0001\u0019A\u0013\t\u000b\tl\u0001\u0019A2\t\u000bYl\u0001\u0019A<\u0002%\u0019,Go\u00195QCJ$\u0018\u000e^5p]\u0012\u000bG/\u0019\t\u0003qzt!!\u001f?\u000e\u0003iT!a_3\u0002\u0011I,\u0017/^3tiNL!! >\u0002\u0019\u0019+Go\u00195SKF,Xm\u001d;\n\u0007}\f\tAA\u0007QCJ$\u0018\u000e^5p]\u0012\u000bG/\u0019\u0006\u0003{jDq!!\u0002\u000e\u0001\u0004\t9!A\u0003feJ|'\u000f\u0005\u0003\u0002\n\u0005=QBAA\u0006\u0015\r\ti!Z\u0001\taJ|Go\\2pY&!\u0011\u0011CA\u0006\u0005\u0019)%O]8sg\u0006)R\r\u001f9fGR\u0014V-\u00193Ge>l'+\u001a9mS\u000e\fGc\u0002\u001e\u0002\u0018\u0005e\u00111\u0004\u0005\u0006A:\u0001\r!\n\u0005\u0006E:\u0001\ra\u0019\u0005\u0006m:\u0001\ra^\u0001\u0019EVLG\u000e\u001a*fC\u0012\u0014Vm];mi^KG\u000f[#se>\u0014H\u0003BA\u0011\u0003O\u00012AIA\u0012\u0013\r\t)#\u0005\u0002\u000e\u0019><'+Z1e%\u0016\u001cX\u000f\u001c;\t\u000f\u0005\u0015q\u00021\u0001\u0002\b\u0001")
public class DelayedFetchTest
extends EasyMockSupport {
    private final int maxBytes;
    private final ReplicaManager replicaManager = (ReplicaManager)this.mock(ReplicaManager.class);
    private final ReplicaQuota replicaQuota = (ReplicaQuota)this.mock(ReplicaQuota.class);

    private int maxBytes() {
        return this.maxBytes;
    }

    private ReplicaManager replicaManager() {
        return this.replicaManager;
    }

    private ReplicaQuota replicaQuota() {
        return this.replicaQuota;
    }

    @Test
    public void testFetchWithFencedEpoch() {
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        long fetchOffset = 500L;
        long logStartOffset = 0L;
        Optional<Integer> currentLeaderEpoch = Optional.of(Predef$.MODULE$.int2Integer(10));
        int replicaId = 1;
        FetchPartitionStatus fetchStatus = new FetchPartitionStatus(new LogOffsetMetadata(fetchOffset, LogOffsetMetadata$.MODULE$.apply$default$2(), LogOffsetMetadata$.MODULE$.apply$default$3()), new FetchRequest.PartitionData(fetchOffset, logStartOffset, this.maxBytes(), currentLeaderEpoch));
        FetchMetadata fetchMetadata = this.buildFetchMetadata(replicaId, topicPartition, fetchStatus);
        ObjectRef fetchResultOpt = ObjectRef.create((Object)None$.MODULE$);
        DelayedFetch delayedFetch = new DelayedFetch(500L, fetchMetadata, this.replicaManager(), this.replicaQuota(), (Option)None$.MODULE$, (Function1 & Serializable & scala.Serializable)responses -> {
            DelayedFetchTest.callback$1(responses, fetchResultOpt);
            return BoxedUnit.UNIT;
        });
        Partition partition = (Partition)this.mock(Partition.class);
        EasyMock.expect((Object)this.replicaManager().getPartitionOrException(topicPartition, true)).andReturn((Object)partition);
        EasyMock.expect((Object)partition.fetchOffsetSnapshot(currentLeaderEpoch, true)).andThrow((Throwable)new FencedLeaderEpochException("Requested epoch has been fenced"));
        this.expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo(), Errors.FENCED_LEADER_EPOCH);
        this.replayAll();
        Assert.assertTrue((boolean)delayedFetch.tryComplete());
        Assert.assertTrue((boolean)delayedFetch.isCompleted());
        Assert.assertTrue((boolean)((Option)fetchResultOpt.elem).isDefined());
        FetchPartitionData fetchResult = (FetchPartitionData)((Option)fetchResultOpt.elem).get();
        Assert.assertEquals((Object)Errors.FENCED_LEADER_EPOCH, (Object)fetchResult.error());
    }

    @Test
    public void testReplicaNotAvailable() {
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        long fetchOffset = 500L;
        long logStartOffset = 0L;
        Optional<Integer> currentLeaderEpoch = Optional.of(Predef$.MODULE$.int2Integer(10));
        int replicaId = 1;
        FetchPartitionStatus fetchStatus = new FetchPartitionStatus(new LogOffsetMetadata(fetchOffset, LogOffsetMetadata$.MODULE$.apply$default$2(), LogOffsetMetadata$.MODULE$.apply$default$3()), new FetchRequest.PartitionData(fetchOffset, logStartOffset, this.maxBytes(), currentLeaderEpoch));
        FetchMetadata fetchMetadata = this.buildFetchMetadata(replicaId, topicPartition, fetchStatus);
        ObjectRef fetchResultOpt = ObjectRef.create((Object)None$.MODULE$);
        DelayedFetch delayedFetch = new DelayedFetch(500L, fetchMetadata, this.replicaManager(), this.replicaQuota(), (Option)None$.MODULE$, (Function1 & Serializable & scala.Serializable)responses -> {
            DelayedFetchTest.callback$2(responses, fetchResultOpt);
            return BoxedUnit.UNIT;
        });
        Partition cfr_ignored_0 = (Partition)this.mock(Partition.class);
        EasyMock.expect((Object)this.replicaManager().getPartitionOrException(topicPartition, true)).andThrow((Throwable)new ReplicaNotAvailableException(new StringBuilder(26).append("Replica for ").append(topicPartition).append(" not available").toString()));
        this.expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo(), Errors.REPLICA_NOT_AVAILABLE);
        this.replayAll();
        Assert.assertTrue((boolean)delayedFetch.tryComplete());
        Assert.assertTrue((boolean)delayedFetch.isCompleted());
        Assert.assertTrue((boolean)((Option)fetchResultOpt.elem).isDefined());
    }

    public void checkCompleteWhenFollowerLaggingHW(Option<Object> followerHW, Function1<DelayedFetch, BoxedUnit> checkResult) {
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        long fetchOffset = 500L;
        long logStartOffset = 0L;
        Optional<Integer> currentLeaderEpoch = Optional.of(Predef$.MODULE$.int2Integer(10));
        int replicaId = 1;
        FetchPartitionStatus fetchStatus = new FetchPartitionStatus(new LogOffsetMetadata(fetchOffset, LogOffsetMetadata$.MODULE$.apply$default$2(), LogOffsetMetadata$.MODULE$.apply$default$3()), new FetchRequest.PartitionData(fetchOffset, logStartOffset, this.maxBytes(), currentLeaderEpoch));
        FetchMetadata fetchMetadata = this.buildFetchMetadata(replicaId, topicPartition, fetchStatus);
        ObjectRef fetchResultOpt = ObjectRef.create((Object)None$.MODULE$);
        DelayedFetch delayedFetch = new DelayedFetch(500L, fetchMetadata, this.replicaManager(), this.replicaQuota(), (Option)None$.MODULE$, (Function1 & Serializable & scala.Serializable)responses -> {
            DelayedFetchTest.callback$3(responses, fetchResultOpt);
            return BoxedUnit.UNIT;
        });
        Partition partition = (Partition)this.mock(Partition.class);
        EasyMock.expect((Object)this.replicaManager().getPartitionOrException(topicPartition, true)).andReturn((Object)partition);
        EasyMock.expect((Object)partition.fetchOffsetSnapshot(currentLeaderEpoch, true)).andReturn((Object)new LogOffsetSnapshot(0L, new LogOffsetMetadata(500L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3()), new LogOffsetMetadata(480L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3()), new LogOffsetMetadata(400L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())));
        this.expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo());
        Replica follower = new Replica(replicaId, topicPartition);
        if (followerHW == null) {
            throw null;
        }
        if (!followerHW.isEmpty()) {
            long l = BoxesRunTime.unboxToLong((Object)followerHW.get());
            follower.updateFetchState(LogOffsetMetadata$.MODULE$.UnknownOffsetMetadata(), 0L, 0L, 0L, l);
        }
        EasyMock.expect((Object)partition.getReplica(replicaId)).andReturn((Object)new Some((Object)follower));
        this.replayAll();
        checkResult.apply((Object)delayedFetch);
    }

    @Test
    public void testCompleteWhenFollowerLaggingHW() {
        this.resetAll();
        this.checkCompleteWhenFollowerLaggingHW((Option<Object>)None$.MODULE$, (Function1<DelayedFetch, BoxedUnit>)(Function1 & Serializable & scala.Serializable)delayedFetch -> {
            DelayedFetchTest.$anonfun$testCompleteWhenFollowerLaggingHW$1(delayedFetch);
            return BoxedUnit.UNIT;
        });
        this.resetAll();
        this.checkCompleteWhenFollowerLaggingHW((Option<Object>)new Some((Object)BoxesRunTime.boxToLong((long)500L)), (Function1<DelayedFetch, BoxedUnit>)(Function1 & Serializable & scala.Serializable)delayedFetch -> {
            DelayedFetchTest.$anonfun$testCompleteWhenFollowerLaggingHW$2(delayedFetch);
            return BoxedUnit.UNIT;
        });
        this.resetAll();
        this.checkCompleteWhenFollowerLaggingHW((Option<Object>)new Some((Object)BoxesRunTime.boxToLong((long)480L)), (Function1<DelayedFetch, BoxedUnit>)(Function1 & Serializable & scala.Serializable)delayedFetch -> {
            DelayedFetchTest.$anonfun$testCompleteWhenFollowerLaggingHW$3(delayedFetch);
            return BoxedUnit.UNIT;
        });
        this.resetAll();
        this.checkCompleteWhenFollowerLaggingHW((Option<Object>)new Some((Object)BoxesRunTime.boxToLong((long)470L)), (Function1<DelayedFetch, BoxedUnit>)(Function1 & Serializable & scala.Serializable)delayedFetch -> {
            DelayedFetchTest.$anonfun$testCompleteWhenFollowerLaggingHW$4(delayedFetch);
            return BoxedUnit.UNIT;
        });
    }

    private FetchMetadata buildFetchMetadata(int replicaId, TopicPartition topicPartition, FetchPartitionStatus fetchStatus) {
        return new FetchMetadata(1, this.maxBytes(), false, true, (FetchIsolation)FetchLogEnd$.MODULE$, true, replicaId, (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)topicPartition, (Object)fetchStatus)})));
    }

    private void expectReadFromReplicaWithError(int replicaId, TopicPartition topicPartition, FetchRequest.PartitionData fetchPartitionData, Errors error) {
        boolean x$2 = true;
        FetchLogEnd$ x$3 = FetchLogEnd$.MODULE$;
        int x$4 = this.maxBytes();
        boolean x$5 = false;
        Seq x$6 = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)topicPartition, (Object)fetchPartitionData)}));
        None$ x$7 = None$.MODULE$;
        ReplicaQuota x$8 = this.replicaQuota();
        EasyMock.expect((Object)this.replicaManager().readFromLocalLog(replicaId, x$2, (FetchIsolation)x$3, x$4, x$5, x$6, x$8, (Option)x$7)).andReturn((Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)topicPartition, (Object)this.buildReadResultWithError(error))})));
    }

    private void expectReadFromReplica(int replicaId, TopicPartition topicPartition, FetchRequest.PartitionData fetchPartitionData) {
        None$ x$1 = None$.MODULE$;
        FetchDataInfo x$2 = new FetchDataInfo(LogOffsetMetadata$.MODULE$.UnknownOffsetMetadata(), (Records)MemoryRecords.EMPTY, FetchDataInfo$.MODULE$.apply$default$3(), FetchDataInfo$.MODULE$.apply$default$4());
        long x$3 = -1L;
        long x$4 = -1L;
        long x$5 = -1L;
        long x$6 = -1L;
        long x$7 = -1L;
        int x$8 = -1;
        None$ x$9 = None$.MODULE$;
        Option x$10 = LogReadResult$.MODULE$.apply$default$9();
        boolean x$11 = LogReadResult$.MODULE$.apply$default$10();
        LogReadResult result = new LogReadResult(x$2, x$3, x$4, x$5, x$6, x$7, x$8, (Option)x$9, x$10, x$11, (Option)x$1);
        boolean x$13 = true;
        FetchLogEnd$ x$14 = FetchLogEnd$.MODULE$;
        int x$15 = this.maxBytes();
        boolean x$16 = false;
        Seq x$17 = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)topicPartition, (Object)fetchPartitionData)}));
        None$ x$18 = None$.MODULE$;
        ReplicaQuota x$19 = this.replicaQuota();
        EasyMock.expect((Object)this.replicaManager().readFromLocalLog(replicaId, x$13, (FetchIsolation)x$14, x$15, x$16, x$17, x$19, (Option)x$18)).andReturn((Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)topicPartition, (Object)result)}))).anyTimes();
    }

    private LogReadResult buildReadResultWithError(Errors error) {
        Some x$1 = new Some((Object)error.exception());
        FetchDataInfo x$2 = new FetchDataInfo(LogOffsetMetadata$.MODULE$.UnknownOffsetMetadata(), (Records)MemoryRecords.EMPTY, FetchDataInfo$.MODULE$.apply$default$3(), FetchDataInfo$.MODULE$.apply$default$4());
        long x$3 = -1L;
        long x$4 = -1L;
        long x$5 = -1L;
        long x$6 = -1L;
        long x$7 = -1L;
        int x$8 = -1;
        None$ x$9 = None$.MODULE$;
        Option x$10 = LogReadResult$.MODULE$.apply$default$9();
        boolean x$11 = LogReadResult$.MODULE$.apply$default$10();
        return new LogReadResult(x$2, x$3, x$4, x$5, x$6, x$7, x$8, (Option)x$9, x$10, x$11, (Option)x$1);
    }

    private static final void callback$1(Seq responses, ObjectRef fetchResultOpt$1) {
        fetchResultOpt$1.elem = new Some(((Tuple2)responses.head())._2());
    }

    private static final void callback$2(Seq responses, ObjectRef fetchResultOpt$2) {
        fetchResultOpt$2.elem = new Some(((Tuple2)responses.head())._2());
    }

    private static final void callback$3(Seq responses, ObjectRef fetchResultOpt$3) {
        fetchResultOpt$3.elem = new Some(((Tuple2)responses.head())._2());
    }

    public static final /* synthetic */ void $anonfun$testCompleteWhenFollowerLaggingHW$1(DelayedFetch delayedFetch) {
        Assert.assertTrue((boolean)delayedFetch.tryComplete());
        Assert.assertTrue((boolean)delayedFetch.isCompleted());
    }

    public static final /* synthetic */ void $anonfun$testCompleteWhenFollowerLaggingHW$2(DelayedFetch delayedFetch) {
        Assert.assertFalse((boolean)delayedFetch.tryComplete());
        Assert.assertFalse((boolean)delayedFetch.isCompleted());
    }

    public static final /* synthetic */ void $anonfun$testCompleteWhenFollowerLaggingHW$3(DelayedFetch delayedFetch) {
        Assert.assertFalse((boolean)delayedFetch.tryComplete());
        Assert.assertFalse((boolean)delayedFetch.isCompleted());
    }

    public static final /* synthetic */ void $anonfun$testCompleteWhenFollowerLaggingHW$4(DelayedFetch delayedFetch) {
        Assert.assertTrue((boolean)delayedFetch.tryComplete());
        Assert.assertTrue((boolean)delayedFetch.isCompleted());
    }

    public DelayedFetchTest() {
        this.maxBytes = 1024;
    }
}

