package kafka.server.link;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import kafka.api.ApiVersion;
import kafka.api.ApiVersion$;
import kafka.api.KAFKA_2_6_IV0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.DelayedOperations;
import kafka.cluster.IsrChangeListener;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.BlockingSend;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache$;
import kafka.server.OffsetTruncationState;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaFetcherThreadTest;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidClusterLinkException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ClusterLinkFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\t}a\u0001\u0002\u0010 \u0001\u0019BQa\u000b\u0001\u0005\u00021Bqa\f\u0001C\u0002\u0013%\u0001\u0007\u0003\u0004:\u0001\u0001\u0006I!\r\u0005\bu\u0001\u0011\r\u0011\"\u0003<\u0011\u0019\u0011\u0005\u0001)A\u0005y!91\t\u0001b\u0001\n\u0013!\u0005BB&\u0001A\u0003%Q\tC\u0005M\u0001\u0001\u0007\t\u0019!C\u0005\u001b\"I\u0011\u000b\u0001a\u0001\u0002\u0004%IA\u0015\u0005\n1\u0002\u0001\r\u0011!Q!\n9C\u0011\"\u0017\u0001A\u0002\u0003\u0007I\u0011\u0002.\t\u0013y\u0003\u0001\u0019!a\u0001\n\u0013y\u0006\"C1\u0001\u0001\u0004\u0005\t\u0015)\u0003\\\u0011\u001d\u0011\u0007A1A\u0005\n\rDaa\u001a\u0001!\u0002\u0013!\u0007b\u00025\u0001\u0005\u0004%I!\u001b\u0005\u0007[\u0002\u0001\u000b\u0011\u00026\t\u000b9\u0004A\u0011K8\t\u000f\u0005e\u0005\u0001\"\u0011\u0002\u001c\"9\u00111\u0017\u0001\u0005\n\u0005U\u0006bBA_\u0001\u0011\u0005\u00131\u0014\u0005\b\u0003\u000f\u0004A\u0011AAN\u0011\u001d\tY\r\u0001C\u0001\u00037Cq!a4\u0001\t\u0013\t\t\u000eC\u0004\u0002b\u0002!\t%a'\t\u000f\u0005M\b\u0001\"\u0011\u0002v\"I\u0011q \u0001\u0012\u0002\u0013\u0005!\u0011\u0001\u0005\b\u0005/\u0001A\u0011IAN\u0011\u001d\u0011Y\u0002\u0001C\u0001\u00037\u0013Ad\u00117vgR,'\u000fT5oW\u001a+Go\u00195feRC'/Z1e)\u0016\u001cHO\u0003\u0002!C\u0005!A.\u001b8l\u0015\t\u00113%\u0001\u0004tKJ4XM\u001d\u0006\u0002I\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001(!\tA\u0013&D\u0001\"\u0013\tQ\u0013E\u0001\rSKBd\u0017nY1GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a+fgR\fa\u0001P5oSRtD#A\u0017\u0011\u00059\u0002Q\"A\u0010\u0002\u001b\rdWo\u001d;fe2Kgn[%e+\u0005\t\u0004C\u0001\u001a8\u001b\u0005\u0019$B\u0001\u001b6\u0003\u0011)H/\u001b7\u000b\u0003Y\nAA[1wC&\u0011\u0001h\r\u0002\u0005+VKE)\u0001\bdYV\u001cH/\u001a:MS:\\\u0017\n\u001a\u0011\u0002\u001f\rdWo\u001d;fe2Kgn\u001b(b[\u0016,\u0012\u0001\u0010\t\u0003{\u0001k\u0011A\u0010\u0006\u0003\u007fU\nA\u0001\\1oO&\u0011\u0011I\u0010\u0002\u0007'R\u0014\u0018N\\4\u0002!\rdWo\u001d;fe2Kgn\u001b(b[\u0016\u0004\u0013\u0001F2mkN$XM\u001d'j].\u0014\u0015mY6pM\u001al5/F\u0001F!\t1\u0015*D\u0001H\u0015\u0005A\u0015!B:dC2\f\u0017B\u0001&H\u0005\rIe\u000e^\u0001\u0016G2,8\u000f^3s\u0019&t7NQ1dW>4g-T:!\u000351W\r^2iKJ$\u0006N]3bIV\ta\n\u0005\u0002/\u001f&\u0011\u0001k\b\u0002\u0019\u00072,8\u000f^3s\u0019&t7NR3uG\",'\u000f\u00165sK\u0006$\u0017!\u00054fi\u000eDWM\u001d+ie\u0016\fGm\u0018\u0013fcR\u00111K\u0016\t\u0003\rRK!!V$\u0003\tUs\u0017\u000e\u001e\u0005\b/&\t\t\u00111\u0001O\u0003\rAH%M\u0001\u000fM\u0016$8\r[3s)\"\u0014X-\u00193!\u0003-a\u0017m\u001d;EK2\f\u00170T:\u0016\u0003m\u0003\"A\u0012/\n\u0005u;%\u0001\u0002'p]\u001e\fq\u0002\\1ti\u0012+G.Y=Ng~#S-\u001d\u000b\u0003'\u0002Dqa\u0016\u0007\u0002\u0002\u0003\u00071,\u0001\u0007mCN$H)\u001a7bs6\u001b\b%A\tgKR\u001c\u0007NU3ta>t7/Z*ju\u0016,\u0012\u0001\u001a\t\u0003]\u0015L!AZ\u0010\u0003#\u0019+Go\u00195SKN\u0004xN\\:f'&TX-\u0001\ngKR\u001c\u0007NU3ta>t7/Z*ju\u0016\u0004\u0013AE2mkN$XM\u001d'j].lU\r\u001e:jGN,\u0012A\u001b\t\u0003]-L!\u0001\\\u0010\u0003%\rcWo\u001d;fe2Kgn['fiJL7m]\u0001\u0014G2,8\u000f^3s\u0019&t7.T3ue&\u001c7\u000fI\u0001\u001bGJ,\u0017\r^3SKBd\u0017nY1GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u000b\u0018aN|\u00181AA\n\u0003;\t9#!\r\u0002N\u0005u\u0013qMAA\u0003\u001b\u0003\"\u0001K9\n\u0005I\f#\u0001\u0006*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\rC\u0003u%\u0001\u0007Q/\u0001\u0003oC6,\u0007C\u0001<~\u001d\t98\u0010\u0005\u0002y\u000f6\t\u0011P\u0003\u0002{K\u00051AH]8pizJ!\u0001`$\u0002\rA\u0013X\rZ3g\u0013\t\teP\u0003\u0002}\u000f\"1\u0011\u0011\u0001\nA\u0002\u0015\u000b\u0011BZ3uG\",'/\u00133\t\u000f\u0005\u0015!\u00031\u0001\u0002\b\u0005a1o\\;sG\u0016\u0014%o\\6feB!\u0011\u0011BA\b\u001b\t\tYAC\u0002\u0002\u000e\r\nqa\u00197vgR,'/\u0003\u0003\u0002\u0012\u0005-!A\u0004\"s_.,'/\u00128e!>Lg\u000e\u001e\u0005\b\u0003+\u0011\u0002\u0019AA\f\u00031\u0011'o\\6fe\u000e{gNZ5h!\rA\u0013\u0011D\u0005\u0004\u00037\t#aC&bM.\f7i\u001c8gS\u001eDq!a\b\u0013\u0001\u0004\t\t#\u0001\tgC&dW\r\u001a)beRLG/[8ogB\u0019\u0001&a\t\n\u0007\u0005\u0015\u0012E\u0001\tGC&dW\r\u001a)beRLG/[8og\"9\u0011\u0011\u0006\nA\u0002\u0005-\u0012A\u0003:fa2L7-Y'heB\u0019\u0001&!\f\n\u0007\u0005=\u0012E\u0001\bSKBd\u0017nY1NC:\fw-\u001a:\t\u000f\u0005M\"\u00031\u0001\u00026\u00059Q.\u001a;sS\u000e\u001c\b\u0003BA\u001c\u0003\u0013j!!!\u000f\u000b\t\u0005M\u00121\b\u0006\u0005\u0003{\ty$\u0001\u0004d_6lwN\u001c\u0006\u0004I\u0005\u0005#\u0002BA\"\u0003\u000b\na!\u00199bG\",'BAA$\u0003\ry'oZ\u0005\u0005\u0003\u0017\nIDA\u0004NKR\u0014\u0018nY:\t\u000f\u0005=#\u00031\u0001\u0002R\u0005!A/[7f!\u0011\t\u0019&!\u0017\u000e\u0005\u0005U#\u0002BA,\u0003w\tQ!\u001e;jYNLA!a\u0017\u0002V\t!A+[7f\u0011\u001d\tyF\u0005a\u0001\u0003C\nQ!];pi\u0006\u00042\u0001KA2\u0013\r\t)'\t\u0002\r%\u0016\u0004H.[2b#V|G/\u0019\u0005\b\u0003S\u0012\u0002\u0019AA6\u0003A!\u0018.\u001a:Ti\u0006$XMR3uG\",'\u000fE\u0003G\u0003[\n\t(C\u0002\u0002p\u001d\u0013aa\u00149uS>t\u0007\u0003BA:\u0003{j!!!\u001e\u000b\t\u0005]\u0014\u0011P\u0001\bM\u0016$8\r[3s\u0015\r\tYhI\u0001\u0005i&,'/\u0003\u0003\u0002��\u0005U$\u0001\u0005+jKJ\u001cF/\u0019;f\r\u0016$8\r[3s\u0011%\t\u0019I\u0005I\u0001\u0002\u0004\t))\u0001\u000emK\u0006$WM]#oIB|\u0017N\u001c;CY>\u001c7.\u001b8h'\u0016tG\rE\u0003G\u0003[\n9\tE\u0002)\u0003\u0013K1!a#\"\u00051\u0011En\\2lS:<7+\u001a8e\u0011%\tyI\u0005I\u0001\u0002\u0004\t\t*A\u0007m_\u001e\u001cuN\u001c;fqR|\u0005\u000f\u001e\t\u0006\r\u00065\u00141\u0013\t\u0005\u0003'\n)*\u0003\u0003\u0002\u0018\u0006U#A\u0003'pO\u000e{g\u000e^3yi\u000691\r\\3b]V\u0004H#A*)\u0007M\ty\n\u0005\u0003\u0002\"\u0006=VBAAR\u0015\u0011\t)+a*\u0002\u0007\u0005\u0004\u0018N\u0003\u0003\u0002*\u0006-\u0016a\u00026va&$XM\u001d\u0006\u0005\u0003[\u000b)%A\u0003kk:LG/\u0003\u0003\u00022\u0006\r&!C!gi\u0016\u0014X)Y2i\u0003E\u0019G.^:uKJd\u0015N\\6D_:4\u0017nZ\u000b\u0003\u0003o\u00032ALA]\u0013\r\tYl\b\u0002\u0012\u00072,8\u000f^3s\u0019&t7nQ8oM&<\u0017aM:i_VdG-V:f\u0019\u0016\fG-\u001a:F]\u0012|eMZ:fi&3\u0017J\u001c;fe\n\u0013xn[3s-\u0016\u00148/[8o\u0005\u0016dwn\u001e\u001a1Q\r)\u0012\u0011\u0019\t\u0005\u0003C\u000b\u0019-\u0003\u0003\u0002F\u0006\r&\u0001\u0002+fgR\fQ\u0004^3tiN{WO]2f\u001f\u001a47/\u001a;t!\u0016tG-\u001b8h'R\fG/\u001a\u0015\u0004-\u0005\u0005\u0017A\n;fgR\u001cv.\u001e:dK>3gm]3ugB+g\u000eZ5oON#\u0018\r^3XSRD\u0017J\u001993m!\u001aq#!1\u0002?Y,'/\u001b4z'>,(oY3PM\u001a\u001cX\r^:QK:$\u0017N\\4Ti\u0006$X\rF\u0002T\u0003'Dq!!6\u0019\u0001\u0004\t9.A\u0002jEB\u0004B!!7\u0002^6\u0011\u00111\u001c\u0006\u0004\u0003K\u001b\u0013\u0002BAp\u00037\u0014!\"\u00119j-\u0016\u00148/[8o\u0003\u0001\"Xm\u001d;G_2dwn^3s\u0013N$\u0006N]8ui2,Gm\u00148M_^$\u0015n]6)\u0007e\t\t\rK\u0004\u001a\u0003O\fi/a<\u0011\t\u0005\u0005\u0016\u0011^\u0005\u0005\u0003W\f\u0019K\u0001\u0005ESN\f'\r\\3e\u0003\u00151\u0018\r\\;fC\t\t\t0\u0001\u000fESN\\\u0007\u0005\u001e5s_R$H.\u001a\u0011jg\u0002rw\u000e\u001e\u0011baBd\u0017.\u001a3\u00023\u0015D\b/Z2u\u001b\u0006\u00148NU3qY&\u001c\u0017\r\u00165s_R$H.\u001a\u000b\u0006'\u0006]\u00181 \u0005\b\u0003sT\u0002\u0019AA\u0016\u00039\u0011X\r\u001d7jG\u0006l\u0015M\\1hKJD\u0001\"!@\u001b!\u0003\u0005\r!R\u0001\u0006i&lWm]\u0001$Kb\u0004Xm\u0019;NCJ\\'+\u001a9mS\u000e\fG\u000b\u001b:piRdW\r\n3fM\u0006,H\u000e\u001e\u00133+\t\u0011\u0019AK\u0002F\u0005\u000bY#Aa\u0002\u0011\t\t%!1C\u0007\u0003\u0005\u0017QAA!\u0004\u0003\u0010\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0005#9\u0015AC1o]>$\u0018\r^5p]&!!Q\u0003B\u0006\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u00019g\"|W\u000f\u001c3O_R4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"<\u0016\u000e\u001e5UeVt7-\u0019;f\u001f:4U\r^2iQ\ra\u0012\u0011Y\u0001\u0019i\u0016\u001cHOR3uG\",'\u000f\u00165sK\u0006$')Y2l_\u001a4\u0007fA\u000f\u0002B\u0002")
/* loaded from: input_file:kafka/server/link/ClusterLinkFetcherThreadTest.class */
public class ClusterLinkFetcherThreadTest extends ReplicaFetcherThreadTest {
    private ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;
    private long kafka$server$link$ClusterLinkFetcherThreadTest$$lastDelayMs;
    private final UUID kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId = UUID.randomUUID();
    private final String kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName = "testCluster";
    private final int clusterLinkBackoffMs = 5000;
    private final FetchResponseSize kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize = new FetchResponseSize(100, 1000);
    private final ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics = new ClusterLinkMetrics(kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName(), kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId(), LinkMode$Destination$.MODULE$, (ClusterLinkManager) null, None$.MODULE$, new Metrics(), None$.MODULE$);

    public UUID kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId;
    }

    public String kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName;
    }

    private int clusterLinkBackoffMs() {
        return this.clusterLinkBackoffMs;
    }

    public ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;
    }

    public void kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread_$eq(ClusterLinkFetcherThread clusterLinkFetcherThread) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread = clusterLinkFetcherThread;
    }

    private long kafka$server$link$ClusterLinkFetcherThreadTest$$lastDelayMs() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$lastDelayMs;
    }

    public void kafka$server$link$ClusterLinkFetcherThreadTest$$lastDelayMs_$eq(long j) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$lastDelayMs = j;
    }

    public FetchResponseSize kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public ReplicaFetcherThread createReplicaFetcherThread(final String str, int i, BrokerEndPoint brokerEndPoint, final KafkaConfig kafkaConfig, final FailedPartitions failedPartitions, final ReplicaManager replicaManager, Metrics metrics, Time time, final ReplicaQuota replicaQuota, Option<TierStateFetcher> option, final Option<BlockingSend> option2, Option<LogContext> option3) {
        final ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) EasyMock.mock(ClusterLinkFetcherManager.class);
        EasyMock.expect(clusterLinkFetcherManager.partition((TopicPartition) EasyMock.anyObject(TopicPartition.class))).andReturn(None$.MODULE$).anyTimes();
        clusterLinkFetcherManager.updatePartitionFetchState((TopicPartition) EasyMock.anyObject(TopicPartition.class), (FetchState) EasyMock.anyObject(FetchState.class));
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToBoolean(clusterLinkFetcherManager.onPartitionLinkFailure((TopicPartition) EasyMock.anyObject(TopicPartition.class), (MirrorFailureType) EasyMock.anyObject(MirrorFailureType.class), EasyMock.anyString(), EasyMock.anyBoolean()))).andReturn(BoxesRunTime.boxToBoolean(false)).anyTimes();
        EasyMock.replay(new Object[]{clusterLinkFetcherManager});
        return new ClusterLinkFetcherThread(this, str, kafkaConfig, clusterLinkFetcherManager, failedPartitions, replicaManager, replicaQuota, option2) { // from class: kafka.server.link.ClusterLinkFetcherThreadTest$$anon$1
            private final /* synthetic */ ClusterLinkFetcherThreadTest $outer;

            public void clearPartitionLinkFailure(TopicPartition topicPartition, long j) {
            }

            public boolean isReadyForFetch(TopicPartition topicPartition) {
                return true;
            }

            public void delayPartitions(Iterable<TopicPartition> iterable, long j) {
                super/*kafka.server.AbstractFetcherThread*/.delayPartitions(iterable, j);
                this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$lastDelayMs_$eq(j);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                ClusterLinkConfig kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig = this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig();
                ClusterLinkMetadata clusterLinkMetadata = new ClusterLinkMetadata(kafkaConfig, this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName(), this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId(), LinkMode$Destination$.MODULE$, 100L, 60000L);
                BrokerEndPoint brokerEndPoint2 = this.brokerEndPoint();
                ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics = this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics();
                SystemTime systemTime = new SystemTime();
                ClusterLinkFetcherThreadTest$$anon$1$$anonfun$$lessinit$greater$1 clusterLinkFetcherThreadTest$$anon$1$$anonfun$$lessinit$greater$1 = new ClusterLinkFetcherThreadTest$$anon$1$$anonfun$$lessinit$greater$1(this);
                ClusterLinkNetworkClient clusterLinkNetworkClient = (ClusterLinkNetworkClient) EasyMock.mock(ClusterLinkNetworkClient.class);
                BlockingSend blockingSend = option2.isDefined() ? (BlockingSend) option2.get() : (BlockingSend) EasyMock.mock(BlockingSend.class);
                None$ none$ = None$.MODULE$;
                None$ none$2 = None$.MODULE$;
            }
        };
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @AfterEach
    public void cleanup() {
        if (kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread() != null) {
            kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().shutdown();
        }
        kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().shutdown();
        super.cleanup();
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(brokerEndPoint().host()).append(":").append(brokerEndPoint().port()).toString());
        properties.put(KafkaConfig$.MODULE$.ReplicaFetchBackoffMsProp(), Integer.toString(clusterLinkBackoffMs()));
        return ClusterLinkConfig$.MODULE$.create(properties);
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        ClusterLinkAdminManager clusterLinkAdminManager = new ClusterLinkAdminManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), "clusterId", (KafkaZkClient) null, ClusterLinkTestUtils$.MODULE$.createClusterLinkManager(), new Metrics(), new MockTime());
        NewClusterLink newClusterLink = new NewClusterLink("test-link", "clusterId", (Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().empty()).asJava());
        Assertions.assertThrows(InvalidClusterLinkException.class, () -> {
            clusterLinkAdminManager.createClusterLink(newClusterLink, None$.MODULE$, new ListenerName("EXTERNAL"), false, false, 1000, 1).get();
        });
    }

    @Test
    public void testSourceOffsetsPendingState() {
        verifySourceOffsetsPendingState(ApiVersion$.MODULE$.latestVersion());
    }

    @Test
    public void testSourceOffsetsPendingStateWithIbp26() {
        verifySourceOffsetsPendingState(KAFKA_2_6_IV0$.MODULE$);
    }

    private void verifySourceOffsetsPendingState(ApiVersion apiVersion) {
        MockTime mockTime = new MockTime();
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        Partition partition = new Partition(topicPartition, 10000L, ApiVersion$.MODULE$.latestVersion(), 0, mockTime, (IsrChangeListener) EasyMock.createNiceMock(IsrChangeListener.class), (DelayedOperations) EasyMock.createNiceMock(DelayedOperations.class), MetadataCache$.MODULE$.zkMetadataCache(0, MetadataCache$.MODULE$.zkMetadataCache$default$2()), logManager, None$.MODULE$, None$.MODULE$, None$.MODULE$, TestUtils$.MODULE$.createAlterIsrManager());
        AbstractLog abstractLog = (AbstractLog) EasyMock.createNiceMock(AbstractLog.class);
        partition.log_$eq(new Some(abstractLog));
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), apiVersion.shortVersion());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        EasyMock.expect(replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(topicPartition)).andReturn(abstractLog).anyTimes();
        EasyMock.expect(replicaManager.onlinePartition(topicPartition)).andReturn(new Some(partition)).anyTimes();
        BlockingSend blockingSend = (BlockingSend) EasyMock.createNiceMock(BlockingSend.class);
        blockingSend.close();
        EasyMock.expect(BoxedUnit.UNIT).once();
        ClusterLinkDestConnectionManager clusterLinkDestConnectionManager = (ClusterLinkDestConnectionManager) EasyMock.createNiceMock(ClusterLinkDestConnectionManager.class);
        EasyMock.expect(clusterLinkDestConnectionManager.reverseConnectionProvider((NetworkClient) EasyMock.anyObject(), (Option) EasyMock.anyObject(), (String) EasyMock.anyObject())).andReturn(None$.MODULE$).anyTimes();
        ClusterLinkManager clusterLinkManager = (ClusterLinkManager) EasyMock.createNiceMock(ClusterLinkManager.class);
        EasyMock.expect(clusterLinkManager.fetchResponseSize((ClusterLinkConfig) EasyMock.anyObject())).andReturn(kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize()).anyTimes();
        EasyMock.replay(new Object[]{replicaManager, logManager, clusterLinkManager, abstractLog, blockingSend});
        ClusterLinkFetcherThreadTest$$anon$2 clusterLinkFetcherThreadTest$$anon$2 = new ClusterLinkFetcherThreadTest$$anon$2(this, clusterLinkManager, clusterLinkDestConnectionManager, fromProps, replicaManager, mockTime, blockingSend);
        clusterLinkFetcherThreadTest$$anon$2.initializeMetadata();
        TestUtils.setFieldValue(partition, "leaderEpoch", BoxesRunTime.boxToInteger(2));
        clusterLinkFetcherThreadTest$$anon$2.addLinkedFetcherForPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread created without metadata");
        clusterLinkFetcherThreadTest$$anon$2.currentMetadata().update(1, RequestTestUtils.metadataUpdateWith("cluster", 1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(1)), topicPartition2 -> {
            return Predef$.MODULE$.int2Integer(1);
        }, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap()), false, mockTime.milliseconds());
        clusterLinkFetcherThreadTest$$anon$2.onNewMetadata(clusterLinkFetcherThreadTest$$anon$2.currentMetadata().fetch());
        Assertions.assertNotNull(kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), "Fetcher thread not created");
        if (ApiVersion$.MODULE$.isTruncationOnFetchSupported(apiVersion)) {
            Assertions.assertFalse(offsetsPending$1(partition), new StringBuilder(24).append("State not reset for IBP ").append(apiVersion).toString());
            return;
        }
        Assertions.assertTrue(offsetsPending$1(partition), "State reset before fetching offsets");
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete(Predef$.MODULE$.Map().empty());
        Assertions.assertTrue(offsetsPending$1(partition), "State reset before source offsets available");
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetTruncationState(10L, false))})));
        Assertions.assertTrue(offsetsPending$1(partition), "State reset before truncation");
        kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetTruncationState(10L, true))})));
        Assertions.assertFalse(offsetsPending$1(partition), "State not reset after truncation");
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Disabled("Disk throttle is not applied")
    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        super.testFollowerIsThrottledOnLowDisk();
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public void expectMarkReplicaThrottle(ReplicaManager replicaManager, int i) {
        replicaManager.markClusterLinkReplicaThrottle();
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    public int expectMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Override // kafka.server.ReplicaFetcherThreadTest
    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(ApiVersion$.MODULE$.latestVersion(), 1);
    }

    @Test
    public void testFetcherThreadBackoff() {
        super.shouldPollIndefinitelyIfLeaderReturnsAnyException();
        Assertions.assertEquals(clusterLinkBackoffMs(), kafka$server$link$ClusterLinkFetcherThreadTest$$lastDelayMs());
    }

    private static final boolean offsetsPending$1(Partition partition) {
        return BoxesRunTime.unboxToBoolean(TestUtils.fieldValue(partition, Partition.class, "needsLinkedLeaderOffsets"));
    }

    public ClusterLinkFetcherThreadTest() {
        kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().startup();
    }
}
