/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.link.FailureType$;
import kafka.log.AbstractLog;
import kafka.server.AbstractFetcherManager;
import kafka.server.FetchConnectionsMode;
import kafka.server.FetcherPool;
import kafka.server.FetcherTag;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.link.ActiveTaskState$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkDestClientManager;
import kafka.server.link.ClusterLinkFetcher;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.ClusterLinkRepairMirrors;
import kafka.server.link.FetcherThreadPoolMode;
import kafka.server.link.MirrorTopicConfigSyncRules$;
import kafka.server.link.TaskErrorCode;
import kafka.server.link.TaskState;
import kafka.server.link.TopicLinkMirror$;
import kafka.server.link.TopicLinkState;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Product;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.SeqOps;
import scala.collection.Set;
import scala.collection.StrictOptimizedIterableOps;
import scala.collection.StringOps$;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashMap$;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.math.Numeric;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0005\u0005\u0015h\u0001B\n\u0015\u0001eAQA\b\u0001\u0005\u0002}AQ!\t\u0001\u0005\u0002\tBQA\u0016\u0001\u0005\u0002]CQ!\u0018\u0001\u0005\u0002yCQ\u0001\u001a\u0001\u0005\u0002\u0015DQa\u001b\u0001\u0005\u00021DQA\u001d\u0001\u0005\u0002MDQ!\u001f\u0001\u0005\u0002iDq!!\u0001\u0001\t\u0003\t\u0019\u0001C\u0004\u0002\u0010\u0001!\t!!\u0005\t\u000f\u0005u\u0001\u0001\"\u0001\u0002 !9\u00111\u0006\u0001\u0005\u0002\u00055\u0002bBA\u001d\u0001\u0011\u0005\u00111\b\u0005\b\u0003\u000f\u0002A\u0011AA%\u0011\u001d\ty\u0005\u0001C\u0001\u0003#Bq!!\u0018\u0001\t\u0013\ty\u0006C\u0005\u0002\"\u0002\t\n\u0011\"\u0003\u0002$\"9\u0011\u0011\u0018\u0001\u0005\n\u0005m&\u0001L\"mkN$XM\u001d'j].$\u0015\r^1QY\u0006tW-T5se>\u0014\u0018N\\4J]R,wM]1uS>tG+Z:u\u0015\t)b#\u0001\u0003mS:\\'\"A\f\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001A\u0007\t\u00037qi\u0011\u0001F\u0005\u0003;Q\u0011!%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001!!\tY\u0002!\u0001\u000buKN$X*\u001b:s_JtUm\u001e*fG>\u0014Hm\u001d\u000b\u0004G%2\u0004C\u0001\u0013(\u001b\u0005)#\"\u0001\u0014\u0002\u000bM\u001c\u0017\r\\1\n\u0005!*#\u0001B+oSRDQA\u000b\u0002A\u0002-\na!];peVl\u0007C\u0001\u00174\u001d\ti\u0013\u0007\u0005\u0002/K5\tqF\u0003\u000211\u00051AH]8pizJ!AM\u0013\u0002\rA\u0013X\rZ3g\u0013\t!TG\u0001\u0004TiJLgn\u001a\u0006\u0003e\u0015BQa\u000e\u0002A\u0002a\n1bY8pe\u0012Lg.\u0019;peB\u0011A%O\u0005\u0003u\u0015\u0012qAQ8pY\u0016\fg\u000e\u000b\u0003\u0003y!K\u0005CA\u001fG\u001b\u0005q$BA A\u0003\u0019\u0001\u0018M]1ng*\u0011\u0011IQ\u0001\bUV\u0004\u0018\u000e^3s\u0015\t\u0019E)A\u0003kk:LGOC\u0001F\u0003\ry'oZ\u0005\u0003\u000fz\u0012\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:u\u0003\u0011q\u0017-\\3\"\u0003)\u000b\u0001f\u001f3jgBd\u0017-\u001f(b[\u0016lh&];peVlWh\u001f\u0019~]\r|wN\u001d3j]\u0006$xN]\u001f|cuDCA\u0001'S'B\u0011Q\nU\u0007\u0002\u001d*\u0011qJP\u0001\taJ|g/\u001b3fe&\u0011\u0011K\u0014\u0002\r\u001b\u0016$\bn\u001c3T_V\u00148-Z\u0001\u0006m\u0006dW/\u001a\u0017\u0002)\u0006\nQ+A\bbY2\u001cu.\u001c2j]\u0006$\u0018n\u001c8t\u0003e!Xm\u001d;NSJ\u0014xN]#ySN$\u0018N\\4SK\u000e|'\u000fZ:\u0015\u0007\rB\u0016\fC\u0003+\u0007\u0001\u00071\u0006C\u00038\u0007\u0001\u0007\u0001\b\u000b\u0003\u0004y!K\u0005\u0006B\u0002M%rc\u0013\u0001V\u0001\u0013i\u0016\u001cHOR3uG\",'\u000f\u00165sK\u0006$7\u000fF\u0002$?\u0002DQA\u000b\u0003A\u0002-BQa\u000e\u0003A\u0002aBC\u0001\u0002\u001fI\u0013\"\"A\u0001\u0014*dY\u0005!\u0016!\u0007;fgRl\u0015N\u001d:peN#\u0018M\u001d;PM\u001a\u001cX\r^*qK\u000e$2a\t4h\u0011\u0015QS\u00011\u0001,\u0011\u00159T\u00011\u00019Q\u0011)A\bS%)\t\u0015a%K\u001b\u0017\u0002)\u0006AB/Z:u\u001b&\u0014(o\u001c:Ti\u0006\u0014H\u000fV5nKN$\u0018-\u001c9\u0015\u0007\rjg\u000eC\u0003+\r\u0001\u00071\u0006C\u00038\r\u0001\u0007\u0001\b\u000b\u0003\u0007y!K\u0005\u0006\u0002\u0004M%Fd\u0013\u0001V\u0001!i\u0016\u001cH/T5se>\u0014x+\u001b;i\t&4g-\u001a:f]R\u0014V\r^3oi&|g\u000eF\u0002$iVDQAK\u0004A\u0002-BQaN\u0004A\u0002aBCa\u0002\u001fI\u0013\"\"q\u0001\u0014*yY\u0005!\u0016a\b;fgR$&/\u00198tC\u000e$\u0018n\u001c8t/&$\b.T5se>\u0014Hk\u001c9jGR\u00191e\u001f?\t\u000b)B\u0001\u0019A\u0016\t\u000b]B\u0001\u0019\u0001\u001d)\t!a\u0004*\u0013\u0015\u0005\u00111\u0013v\u0010L\u0001U\u0003%\"Xm\u001d;EK2,G/\u001a*fG>\u0014Hm],ji\"Le\u000eZ3qK:$WM\u001c;SKR,g\u000e^5p]R)1%!\u0002\u0002\b!)!&\u0003a\u0001W!)q'\u0003a\u0001q!\"\u0011\u0002\u0010%JQ\u0015IAJUA\u0007Y\u0005!\u0016A\u0005;fgRl\u0015\r_'fgN\fw-Z*ju\u0016$RaIA\n\u0003+AQA\u000b\u0006A\u0002-BQa\u000e\u0006A\u0002aBCA\u0003\u001fI\u0013\"*!\u0002\u0014*\u0002\u001c1\nA+\u0001\tuKN$H)Z:u%\u0016\fGm\u00148msR)1%!\t\u0002$!)!f\u0003a\u0001W!)qg\u0003a\u0001q!\"1\u0002\u0010%JQ\u0015YAJUA\u0015Y\u0005!\u0016A\u0006;fgR$\u0006N]8ui2,\u0007+\u0019:uSRLwN\\:\u0015\u000b\r\ny#!\r\t\u000b)b\u0001\u0019A\u0016\t\u000b]b\u0001\u0019\u0001\u001d)\t1a\u0004*\u0013\u0015\u0006\u00191\u0013\u0016q\u0007\u0017\u0002)\u0006iC/Z:u)\"\u0014x\u000e\u001e;mKB\u000b'\u000f^5uS>t7oV5uQN{WO]2f\u0019\u0016\fG-\u001a:DQ\u0006tw-Z:\u0015\u000b\r\ni$a\u0010\t\u000b)j\u0001\u0019A\u0016\t\u000b]j\u0001\u0019\u0001\u001d)\t5a\u0004*\u0013\u0015\u0006\u001b1\u0013\u0016Q\t\u0017\u0002)\u0006Ab/\u001a:jMf$\u0006N]8ui2,\u0007+\u0019:uSRLwN\\:\u0015\u0007\r\nY\u0005\u0003\u0004\u0002N9\u0001\r\u0001O\u0001\u0015g\",H\u000fZ8x]N{WO]2f\u0005J|7.\u001a:\u0002)Q,7\u000f^!vi>$VO\\3GKR\u001c\u0007.\u001a:t)\u0015\u0019\u00131KA+\u0011\u0015Qs\u00021\u0001,\u0011\u00159t\u00021\u00019Q\u0011yA\bS%)\u000b=a%+a\u0017-\u0003Q\u000b!\u0002\\8h\u001f\u001a47/\u001a;t))\t\t'a\u001d\u0002~\u0005\u0005\u0015q\u0013\t\u0007\u0003G\nI'!\u001c\u000e\u0005\u0005\u0015$bAA4K\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\t\u0005-\u0014Q\r\u0002\u0004'\u0016\f\bc\u0001\u0013\u0002p%\u0019\u0011\u0011O\u0013\u0003\t1{gn\u001a\u0005\b\u0003k\u0002\u0002\u0019AA<\u0003\u001d\u0019G.^:uKJ\u00042aGA=\u0013\r\tY\b\u0006\u0002\u0017\u00072,8\u000f^3s\u0019&t7\u000eV3ti\"\u000b'O\\3tg\"1\u0011q\u0010\tA\u0002-\nQ\u0001^8qS\u000eDq!a!\u0011\u0001\u0004\t))A\u0005m_\u001e|eMZ:fiB9A%a\"\u0002\f\u00065\u0014bAAEK\tIa)\u001e8di&|g.\r\t\u0005\u0003\u001b\u000b\u0019*\u0004\u0002\u0002\u0010*\u0019\u0011\u0011\u0013\f\u0002\u00071|w-\u0003\u0003\u0002\u0016\u0006=%aC!cgR\u0014\u0018m\u0019;M_\u001eD\u0011\"!'\u0011!\u0003\u0005\r!a'\u0002\u001b9,X\u000eU1si&$\u0018n\u001c8t!\r!\u0013QT\u0005\u0004\u0003?+#aA%oi\u0006!Bn\\4PM\u001a\u001cX\r^:%I\u00164\u0017-\u001e7uIQ*\"!!*+\t\u0005m\u0015qU\u0016\u0003\u0003S\u0003B!a+\u000266\u0011\u0011Q\u0016\u0006\u0005\u0003_\u000b\t,A\u0005v]\u000eDWmY6fI*\u0019\u00111W\u0013\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u00028\u00065&!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006y2\u000f[;uI><hn\u00117fCJl\u0015N\u001d:peN#\u0018M\u001d;PM\u001a\u001cX\r^:\u0015\u0007\r\ni\fC\u0004\u0002@J\u0001\r!!1\u0002\r1Lgn[%e!\u0011\t\u0019-a4\u000e\u0005\u0005\u0015'\u0002BAd\u0003\u0013\faaY8n[>t'bA\f\u0002L*\u0019\u0011Q\u001a#\u0002\r\u0005\u0004\u0018m\u00195f\u0013\u0011\t\t.!2\u0003\tU+\u0018\u000e\u001a\u0015\u0007\u0001\u0005U'+!9\u0011\t\u0005]\u0017Q\\\u0007\u0003\u00033T1!a7A\u0003\r\t\u0007/[\u0005\u0005\u0003?\fINA\u0002UC\u001e\f#!a9\u0002\u0017%tG/Z4sCRLwN\u001c")
public class ClusterLinkDataPlaneMirroringIntegrationTest
extends AbstractClusterLinkIntegrationTest {
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMirrorNewRecords(String quorum, boolean coordinator) {
        int numRecords = 20;
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        Properties linkProps = this.destLinkProps(this.destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(this.clusterLinkPrefix()))) {
            linkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), this.clusterLinkPrefix());
        }
        Uuid linkId = this.createClusterLink(this.linkName(), linkProps, this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        String x$10 = this.clusterLinkPrefix();
        Map<String, String> x$11 = qual$2.linkTopic$default$4();
        qual$2.linkTopic(x$7, x$8, x$9, x$11, x$10);
        this.produceToSourceCluster(numRecords);
        this.consume(this.sourceCluster(), "");
        this.waitAndVerifyMetricsAndMirror(this.topic(), linkId, this.waitAndVerifyMetricsAndMirror$default$3());
        if (quorum.equals("zk")) {
            this.verifySaslJaasConfigEncrypted(linkId);
        }
        this.verifyBackgroundThreadMetrics();
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$12 = this.linkName();
        boolean x$13 = qual$3.deleteClusterLink$default$2();
        Seq<KafkaBroker> x$14 = qual$3.deleteClusterLink$default$3();
        qual$3.deleteClusterLink(x$12, x$13, x$14);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMirrorExistingRecords(String quorum, boolean coordinator) {
        int numRecords = 20;
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        this.produceToSourceCluster(numRecords);
        Properties linkProps = this.destLinkProps(this.destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(this.clusterLinkPrefix()))) {
            linkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), this.clusterLinkPrefix());
        }
        Uuid linkId = this.createClusterLink(this.linkName(), linkProps, this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        this.destCluster().linkTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), this.topic(), this.replicationFactor(), this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.empty()), (Option<OffsetSpec>)new Some((Object)new OffsetSpec.EarliestSpec()));
        this.waitAndVerifyMetricsAndMirror(this.topic(), linkId, this.waitAndVerifyMetricsAndMirror$default$3());
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testFetcherThreads(String quorum, boolean coordinator) {
        int numRecords = 80;
        this.numPartitions_$eq(16);
        IndexedSeq sourceAssignment = (IndexedSeq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 8).map((Function1 & Serializable)i -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)i))), (Object)scala.collection.Seq$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1})))).$plus$plus((IterableOnce)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(8), 16).map((Function1 & Serializable)i -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)i))), (Object)scala.collection.Seq$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 0})))));
        this.sourceCluster().withAdmin((Function1 & Serializable)admin -> TestUtils$.MODULE$.createTopicWithAdmin((Admin)admin, this.topic(), this.sourceCluster().brokers(), this.sourceCluster().controllerServers(), this.numPartitions(), this.replicationFactor(), (Map<Object, Seq<Object>>)sourceAssignment.toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()), new Properties()));
        Uuid linkId = this.createClusterLink(this.linkName(), this.destLinkProps(this.destLinkProps$default$1()), this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$1 = this.destCluster();
        String x$1 = this.topic();
        short x$2 = this.replicationFactor();
        String x$3 = this.linkName();
        String x$4 = this.clusterLinkPrefix();
        Map<String, String> x$5 = qual$1.linkTopic$default$4();
        qual$1.linkTopic(x$1, x$2, x$3, x$5, x$4);
        scala.collection.immutable.Set leaderCombinations = ((IterableOnceOps)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$4(this, BoxesRunTime.unboxToInt((Object)i))).map((Function1 & Serializable)tp -> new Tuple2.mcII.sp(this.sourceCluster().partitionLeader((TopicPartition)tp).config().brokerId(), this.destCluster().partitionLeader((TopicPartition)tp).config().brokerId()))).toSet();
        Assertions.assertEquals((int)4, (int)leaderCombinations.size(), (String)new StringBuilder(20).append("Unexpected leaders: ").append(leaderCombinations).toString());
        this.produceToSourceCluster(numRecords);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        if (((ClusterLinkConfig)((KafkaBroker)this.destCluster().brokers().head()).clusterLinkManager().linkConfig(linkId).get()).useIsolatedFetcherPool()) {
            scala.collection.mutable.Map brokerDefaultFetcherAndPartitionCount = (scala.collection.mutable.Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$);
            long l = 100L;
            long waitUntilTrue_waitTimeMs = 15000L;
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$6(this, linkId, brokerDefaultFetcherAndPartitionCount, numRecords)) {
                void waitUntilTrue_pause;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$10(brokerDefaultFetcherAndPartitionCount));
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
            long l2 = 100L;
            long waitUntilTrue_waitTimeMs2 = 15000L;
            long waitUntilTrue_startTime2 = System.currentTimeMillis();
            while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$11(this, linkId)) {
                void waitUntilTrue_pause;
                if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs2) {
                    Assertions.fail((String)"No fetcher in InSync pool");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs2), (long)waitUntilTrue_pause));
            }
            this.produceToSourceCluster(numRecords);
            this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
            this.verifyYammerMetric("kafka.server:type=FetcherStats,name=BytesPerSec", this.verifyYammerMetric$default$2());
            this.waitForFetcherMetrics("kafka.server:type=FetcherLagMetrics,name=ConsumerLag");
        }
        Assertions.assertEquals((int)1, (int)this.maxFetcherCount(linkId));
        this.verifyFetcherThreads(linkId, this.fetcherThreadPoolMode(), 1, 1);
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$6 = this.linkName();
        Map x$7 = (Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp()), (Object)"3")}));
        Seq<KafkaBroker> x$8 = qual$2.alterClusterLink$default$3();
        scala.collection.immutable.Set<String> x$9 = qual$2.alterClusterLink$default$4();
        boolean x$10 = qual$2.alterClusterLink$default$5();
        qual$2.alterClusterLink(x$6, (Map<String, String>)x$7, x$8, x$9, x$10);
        int maxFetcherThreads = ((ClusterLinkConfig)((KafkaBroker)this.destCluster().brokers().head()).clusterLinkManager().linkConfig(linkId).get()).useIsolatedFetcherPool() ? 6 : 3;
        long l = 15000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l3 = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                this.verifyFetcherThreads(linkId, this.fetcherThreadPoolMode(), 2, maxFetcherThreads);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l3).append(", and then retrying.").toString();
                    Object var57_33 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l3);
                l3 += package$.MODULE$.min(l3, 1000L);
                continue;
            }
            break;
        }
        Object var45_28 = null;
        Object var50_32 = null;
        this.produceToSourceCluster(numRecords);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        FetcherThreadPoolMode fetcherThreadPoolMode = this.fetcherThreadPoolMode();
        FetcherThreadPoolMode.Endpoint$ endpoint$ = FetcherThreadPoolMode.Endpoint$.MODULE$;
        FetcherThreadPoolMode.Endpoint$ newMode = fetcherThreadPoolMode != null && fetcherThreadPoolMode.equals(endpoint$) ? FetcherThreadPoolMode.Link$.MODULE$ : FetcherThreadPoolMode.Endpoint$.MODULE$;
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$11 = this.linkName();
        Map x$12 = (Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.FetcherThreadPoolModeProp()), (Object)newMode.toString())}));
        Seq<KafkaBroker> x$13 = qual$3.alterClusterLink$default$3();
        scala.collection.immutable.Set<String> x$14 = qual$3.alterClusterLink$default$4();
        boolean x$15 = qual$3.alterClusterLink$default$5();
        qual$3.alterClusterLink(x$11, (Map<String, String>)x$12, x$13, x$14, x$15);
        long l4 = 15000L;
        TestUtils$ retry_this2 = TestUtils$.MODULE$;
        long l5 = 1L;
        long retry_startTime2 = System.currentTimeMillis();
        while (true) {
            try {
                this.verifyFetcherThreads(linkId, (FetcherThreadPoolMode)newMode, 2, maxFetcherThreads);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime2 > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this2.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l5).append(", and then retrying.").toString();
                    Object var58_48 = null;
                    retry_this2.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this2, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l5);
                l5 += package$.MODULE$.min(l5, 1000L);
                continue;
            }
            break;
        }
        Object var51_43 = null;
        Object var56_47 = null;
        this.destCluster().brokers().foreach(arg_0 -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$16$adapted((Product)newMode, linkId, arg_0));
        this.produceToSourceCluster(numRecords);
        this.verifyMirror(this.topic(), this.verifyMirror$default$2(), this.verifyMirror$default$3(), this.verifyMirror$default$4());
        ClusterLinkTestHarness qual$4 = this.destCluster();
        String x$16 = this.linkName();
        boolean x$17 = qual$4.deleteClusterLink$default$2();
        Seq<KafkaBroker> x$18 = qual$4.deleteClusterLink$default$3();
        qual$4.deleteClusterLink(x$16, x$17, x$18);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMirrorStartOffsetSpec(String quorum, boolean coordinator) {
        Properties linkProps = this.destLinkProps(this.destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(this.clusterLinkPrefix()))) {
            linkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), this.clusterLinkPrefix());
        }
        Uuid linkId = this.createClusterLink(this.linkName(), linkProps, this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$1 = this.destCluster();
        ByteArrayDeserializer x$1 = qual$1.createConsumer$default$1();
        ByteArrayDeserializer x$2 = qual$1.createConsumer$default$2();
        Properties x$3 = qual$1.createConsumer$default$3();
        List<String> x$4 = qual$1.createConsumer$default$4();
        Consumer consumer = qual$1.createConsumer(x$1, x$2, x$3, x$4);
        int n = 0;
        this.verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)new OffsetSpec.LatestSpec()), true, true, true, consumer);
        this.verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(++n).toString(), (Option)None$.MODULE$, false, true, true, consumer);
        this.alterClusterLink(this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.MirrorStartOffsetSpecProp()), (Object)"earliest")}))));
        this.verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)new OffsetSpec.LatestSpec()), true, false, true, consumer);
        this.verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(++n).toString(), (Option)None$.MODULE$, false, false, true, consumer);
        this.alterClusterLink(this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.MirrorStartOffsetSpecProp()), (Object)"latest")}))));
        this.verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(++n).toString(), (Option)None$.MODULE$, true, false, true, consumer);
        this.verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)new OffsetSpec.EarliestSpec()), false, false, true, consumer);
        String futureDatetime = TestUtils$.MODULE$.dateTime(System.currentTimeMillis() + 1000000L);
        this.alterClusterLink(this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.MirrorStartOffsetSpecProp()), (Object)futureDatetime)}))));
        this.verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(++n).toString(), (Option)None$.MODULE$, true, false, true, consumer);
        this.verifyMirrorWithStartOffsetSpec$1(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)OffsetSpec.forTimestamp((long)(System.currentTimeMillis() - 1000000L))), false, false, true, consumer);
        this.assertClusterLinkTaskMetricMaxVal("clear-mirror-start-offsets", ClusterLinkMetrics$.MODULE$.metricsGroup(), (TaskState)ActiveTaskState$.MODULE$, (Seq<TaskErrorCode>)((Seq)scala.collection.Seq$.MODULE$.empty()), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)this.destCluster().linkCoordinator(this.linkName()), (List)Nil$.MODULE$), this.destCluster().nonLinkCoordinators(this.linkName()), this.assertClusterLinkTaskMetricMaxVal$default$8());
        if (coordinator) {
            this.shutdownClearMirrorStartOffsets(linkId);
            String topic = new StringBuilder(5).append("topic").append(++n).toString();
            Some x$6 = new Some((Object)new OffsetSpec.LatestSpec());
            boolean x$9 = false;
            Seq startOffsets = this.verifyMirrorWithStartOffsetSpec$1(topic, (Option)x$6, true, x$9, false, consumer);
            String mirrorTopic = new StringBuilder(0).append(this.clusterLinkPrefix()).append(topic).toString();
            String x$12 = this.linkName();
            TopicLinkMirror$ x$13 = TopicLinkMirror$.MODULE$;
            ClusterLinkTestHarness x$15 = this.waitForMetadataCacheUpdate$default$5();
            this.waitForMetadataCacheUpdate(mirrorTopic, linkId, x$12, (TopicLinkState)x$13, x$15, (Seq<Object>)startOffsets);
            ClusterLinkTestHarness qual$2 = this.destCluster();
            String x$17 = this.linkName();
            boolean x$18 = qual$2.unlinkTopic$default$3();
            boolean x$19 = qual$2.unlinkTopic$default$4();
            boolean x$20 = qual$2.unlinkTopic$default$5();
            int x$21 = qual$2.unlinkTopic$default$6();
            qual$2.unlinkTopic(mirrorTopic, x$17, x$18, x$19, x$20, x$21);
        }
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$22 = this.linkName();
        boolean x$23 = qual$3.deleteClusterLink$default$2();
        Seq<KafkaBroker> x$24 = qual$3.deleteClusterLink$default$3();
        qual$3.deleteClusterLink(x$22, x$23, x$24);
        consumer.close();
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMirrorStartTimestamp(String quorum, boolean coordinator) {
        this.createClusterLink(this.linkName(), this.destLinkProps((Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.max.age.ms"), (Object)"500")})))), this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$1 = this.destCluster();
        ByteArrayDeserializer x$1 = qual$1.createConsumer$default$1();
        ByteArrayDeserializer x$2 = qual$1.createConsumer$default$2();
        Properties x$3 = qual$1.createConsumer$default$3();
        List<String> x$4 = qual$1.createConsumer$default$4();
        Consumer consumer = qual$1.createConsumer(x$1, x$2, x$3, x$4);
        long currentTimeMs = System.currentTimeMillis();
        int recordsPerPartitionPerBatch = 5;
        int batchSize = this.numPartitions() * recordsPerPartitionPerBatch;
        int n = 0;
        Seq batchAges = (Seq)scala.collection.Seq$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{120000, 30000, 0, 60000}));
        this.verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)OffsetSpec.forTimestamp((long)(currentTimeMs - 30000L))), batchAges, 1, batchSize, currentTimeMs, recordsPerPartitionPerBatch, consumer);
        this.verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)OffsetSpec.forTimestamp((long)(currentTimeMs - 90000L))), batchAges, 1, batchSize, currentTimeMs, recordsPerPartitionPerBatch, consumer);
        this.verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)OffsetSpec.forTimestamp((long)(currentTimeMs - 60000L))), batchAges, 1, batchSize, currentTimeMs, recordsPerPartitionPerBatch, consumer);
        this.verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)OffsetSpec.forTimestamp((long)currentTimeMs)), batchAges, 2, batchSize, currentTimeMs, recordsPerPartitionPerBatch, consumer);
        this.verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)OffsetSpec.forTimestamp((long)(currentTimeMs + 30000L))), batchAges, 4, batchSize, currentTimeMs, recordsPerPartitionPerBatch, consumer);
        this.verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(++n).toString(), (Option)new Some((Object)new OffsetSpec.LatestSpec()), batchAges, 4, batchSize, currentTimeMs, recordsPerPartitionPerBatch, consumer);
        this.verifyMirrorWithStartOffsetSpec$2(new StringBuilder(5).append("topic").append(++n).toString(), (Option)None$.MODULE$, batchAges, 0, batchSize, currentTimeMs, recordsPerPartitionPerBatch, consumer);
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$5 = this.linkName();
        boolean x$6 = qual$2.deleteClusterLink$default$2();
        Seq<KafkaBroker> x$7 = qual$2.deleteClusterLink$default$3();
        qual$2.deleteClusterLink(x$5, x$6, x$7);
        consumer.close();
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMirrorWithDifferentRetention(String quorum, boolean coordinator) {
        void var30_40;
        Tuple2 tuple2;
        void var22_27;
        Tuple2 tuple22;
        this.numPartitions_$eq(1);
        Properties topicConfig = new Properties();
        topicConfig.setProperty("segment.bytes", "1000");
        topicConfig.setProperty("retention.bytes", "1000");
        topicConfig.setProperty("retention.ms", "10000000");
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, topicConfig, x$5, x$6);
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        Properties linkProps = this.destLinkProps(this.destLinkProps$default$1());
        String topicConfigInclude = new StringBuilder(34).append(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(",")).append(",segment.bytes,delete.retention.ms").toString();
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), topicConfigInclude);
        Uuid linkId = this.createClusterLink(this.linkName(), linkProps, this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map x$10 = (Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"retention.ms"), (Object)"20000000")}));
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, (Map<String, String>)x$10, x$11);
        while (this.sourceCluster().leaderLog(tp).logStartOffset() <= 100L) {
            this.produceToSourceAndWaitForMirror(10);
        }
        this.produceToSourceAndWaitForMirror(10);
        AbstractLog destLog = this.destCluster().leaderLog(tp);
        Assertions.assertEquals((long)0L, (long)destLog.logStartOffset());
        this.consume(this.destCluster(), this.consume$default$2());
        Assertions.assertEquals((int)1000, (int)destLog.config().segmentSize);
        Assertions.assertEquals((long)20000000L, (long)destLog.config().retentionMs);
        Assertions.assertEquals((Long)((KafkaBroker)this.destCluster().brokers().head()).config().logRetentionBytes(), (long)destLog.config().retentionSize);
        Assertions.assertTrue((boolean)this.destCluster().brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.usesIndependentRetention$1(broker, linkId))));
        this.sourceCluster().alterTopic(this.topic(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"segment.bytes"), (Object)"999"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"retention.ms"), (Object)"30000000")}))));
        long l = 100L;
        long computeUntilTrue_waitTime = 15000L;
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            int n = destLog.config().segmentSize;
            Integer computeUntilTrue_result = BoxesRunTime.boxToInteger((int)n);
            if (ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testMirrorWithDifferentRetention$3(n)) {
                tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + computeUntilTrue_waitTime) {
                tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue_waitTime), (long)computeUntilTrue_pause));
        }
        Object var45_24 = null;
        Tuple2 tuple23 = tuple22;
        if (tuple23 == null) {
            throw new MatchError(null);
        }
        int destSegmentSize = tuple23._1$mcI$sp();
        Assertions.assertEquals((int)999, (int)var22_27);
        Assertions.assertEquals((long)20000000L, (long)destLog.config().retentionMs);
        Assertions.assertEquals((Long)((KafkaBroker)this.destCluster().brokers().head()).config().logRetentionBytes(), (long)destLog.config().retentionSize);
        Assertions.assertTrue((boolean)this.destCluster().brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.usesIndependentRetention$1(broker, linkId))));
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$12 = this.linkName();
        Map x$13 = (Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp()), (Object)new StringBuilder(13).append(topicConfigInclude).append(",").append("retention.ms").toString())}));
        scala.collection.immutable.Seq x$14 = (scala.collection.immutable.Seq)Seq$.MODULE$.empty();
        scala.collection.immutable.Set<String> x$15 = qual$3.alterClusterLink$default$4();
        boolean x$16 = qual$3.alterClusterLink$default$5();
        qual$3.alterClusterLink(x$12, (Map<String, String>)x$13, (Seq<KafkaBroker>)x$14, x$15, x$16);
        long l2 = 100L;
        long computeUntilTrue_waitTime2 = 15000L;
        long computeUntilTrue_startTime2 = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            long l3 = destLog.config().retentionMs;
            Long computeUntilTrue_result = BoxesRunTime.boxToLong((long)l3);
            if (ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testMirrorWithDifferentRetention$6(l3)) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime2 + computeUntilTrue_waitTime2) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue_waitTime2), (long)computeUntilTrue_pause));
        }
        Object var52_37 = null;
        Tuple2 tuple24 = tuple2;
        if (tuple24 == null) {
            throw new MatchError(null);
        }
        long destRetentionMs = tuple24._1$mcJ$sp();
        Assertions.assertEquals((long)30000000L, (long)var30_40);
        long l4 = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testMirrorWithDifferentRetention$7(this, linkId)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Retention config update not applied");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        this.produceToSourceAndWaitForMirror(10);
        this.destCluster().waitForStartOffset(tp, this.sourceCluster().leaderLog(tp).logStartOffset());
        this.waitUntilMirrorStartOffsetsAreCleared(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), this.waitUntilMirrorStartOffsetsAreCleared$default$2());
        ClusterLinkTestHarness qual$4 = this.destCluster();
        String x$17 = new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString();
        String x$18 = this.linkName();
        int x$19 = this.numPartitions();
        boolean x$20 = qual$4.unlinkTopic$default$3();
        boolean x$21 = qual$4.unlinkTopic$default$4();
        boolean x$22 = qual$4.unlinkTopic$default$5();
        qual$4.unlinkTopic(x$17, x$18, x$20, x$21, x$22, x$19);
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testTransactionsWithMirrorTopic(String quorum, boolean coordinator) {
        String groupId = "testGroup";
        String topic2 = "anotherTopic";
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        ClusterLinkTestHarness qual$2 = this.destCluster();
        int x$8 = this.numPartitions();
        short x$9 = this.replicationFactor();
        Properties x$10 = qual$2.createTopic$default$4();
        ListenerName x$11 = qual$2.createTopic$default$5();
        Properties x$12 = qual$2.createTopic$default$6();
        qual$2.createTopic(topic2, x$8, x$9, x$10, x$11, x$12);
        String x$13 = this.linkName();
        Properties x$15 = this.createClusterLink$default$2();
        Option<Properties> x$16 = this.createClusterLink$default$3();
        boolean x$17 = this.createClusterLink$default$5();
        this.createClusterLink(x$13, x$15, x$16, true, x$17);
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$18 = this.topic();
        short x$19 = this.replicationFactor();
        String x$20 = this.linkName();
        Map<String, String> x$21 = qual$3.linkTopic$default$4();
        String x$22 = qual$3.linkTopic$default$5();
        qual$3.linkTopic(x$18, x$19, x$20, x$21, x$22);
        Properties producerProps = new Properties();
        producerProps.setProperty("transactional.id", "test_txn");
        producerProps.setProperty("acks", "all");
        ClusterLinkTestHarness qual$4 = this.destCluster();
        ByteArraySerializer x$24 = qual$4.createProducer$default$1();
        ByteArraySerializer x$25 = qual$4.createProducer$default$2();
        try (KafkaProducer producer = qual$4.createProducer(x$24, x$25, producerProps);){
            producer.initTransactions();
            Properties consumerProps = new Properties();
            consumerProps.setProperty("group.id", groupId);
            consumerProps.setProperty("isolation.level", "read_committed");
            ClusterLinkTestHarness qual$5 = this.destCluster();
            ByteArrayDeserializer x$27 = qual$5.createConsumer$default$1();
            ByteArrayDeserializer x$28 = qual$5.createConsumer$default$2();
            List<String> x$29 = qual$5.createConsumer$default$4();
            try (Consumer consumer = qual$5.createConsumer(x$27, x$28, consumerProps, x$29);){
                consumer.assign((Collection)CollectionConverters$.MODULE$.SeqHasAsJava(this.partitions(this.partitions$default$1(), this.partitions$default$2(), this.partitions$default$3())).asJava());
                Seq consumedRecords = TestUtils$.MODULE$.consumeRecords(consumer, this.producedRecords().size(), 20000L);
                java.util.Map consumedOffsets = CollectionConverters$.MODULE$.MapHasAsJava((Map)CollectionConverters$.MODULE$.MapHasAsScala(consumer.endOffsets((Collection)CollectionConverters$.MODULE$.SeqHasAsJava(this.partitions(this.partitions$default$1(), this.partitions$default$2(), this.partitions$default$3())).asJava())).asScala().map((Function1 & Serializable)x0$1 -> {
                    if (x0$1 != null) {
                        TopicPartition tp = (TopicPartition)x0$1._1();
                        Long offset = (Long)x0$1._2();
                        return new Tuple2((Object)tp, (Object)new OffsetAndMetadata(Predef$.MODULE$.Long2long(offset)));
                    }
                    throw new MatchError(null);
                }).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava();
                producer.beginTransaction();
                consumedRecords.foreach((Function1 & Serializable)record -> producer.send(new ProducerRecord(topic2, Predef$.MODULE$.int2Integer(record.partition()), Predef$.MODULE$.long2Long(record.timestamp()), record.key(), record.value())));
                producer.sendOffsetsToTransaction(consumedOffsets, new ConsumerGroupMetadata(groupId));
                producer.commitTransaction();
                Assertions.assertEquals((Object)consumedOffsets, (Object)consumer.committed(CollectionConverters$.MODULE$.SetHasAsJava((Set)this.partitions(this.partitions$default$1(), this.partitions$default$2(), this.partitions$default$3()).toSet()).asJava()));
                producer.beginTransaction();
                ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> this.produceRecords((KafkaProducer<byte[], byte[]>)producer, this.topic(), 1, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6()));
                Assertions.assertTrue((boolean)e.getMessage().matches(".*Could not add partitions to transaction due to errors.*INVALID_REQUEST.*"), (String)new StringBuilder(17).append("Unexpected error ").append(e.getMessage()).toString());
                producer.abortTransaction();
                this.producedRecords().clear();
                ClusterLinkTestHarness qual$6 = this.destCluster();
                String x$30 = this.topic();
                String x$31 = this.linkName();
                boolean x$33 = qual$6.unlinkTopic$default$3();
                boolean x$34 = qual$6.unlinkTopic$default$5();
                int x$35 = qual$6.unlinkTopic$default$6();
                qual$6.unlinkTopic(x$30, x$31, x$33, false, x$34, x$35);
                long l = 100L;
                long waitUntilTrue_waitTimeMs = 15000L;
                long waitUntilTrue_startTime = System.currentTimeMillis();
                while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testTransactionsWithMirrorTopic$4(this)) {
                    void waitUntilTrue_pause;
                    if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                        Assertions.fail((String)"Mirror not stopped");
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
                }
                producer.beginTransaction();
                this.produceRecords(producer, this.topic(), 10, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
                producer.commitTransaction();
                this.consumeRecords(consumer, this.consumeRecords$default$2(), this.consumeRecords$default$3());
            }
        }
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testDeleteRecordsWithIndependentRetention(String quorum, boolean coordinator) {
        this.numPartitions_$eq(1);
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        Properties linkProps = this.destLinkProps(this.destLinkProps$default$1());
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(","));
        this.createClusterLink(this.linkName(), linkProps, this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        ClusterLinkTestHarness qual$3 = this.sourceCluster();
        ListenerName x$12 = qual$3.createAdminClient$default$1();
        Properties x$13 = qual$3.createAdminClient$default$2();
        Admin sourceAdmin = qual$3.createAdminClient(x$12, x$13);
        int deletedOffset = -1;
        while (this.sourceCluster().leaderLog(tp).logStartOffset() <= 100L) {
            this.produceToSourceAndWaitForMirror(10);
            deletedOffset = this.producedRecords().size();
            sourceAdmin.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset((long)deletedOffset))).all().get(15L, TimeUnit.SECONDS);
        }
        this.sourceCluster().waitForStartOffset(tp, deletedOffset);
        this.produceToSourceAndWaitForMirror(10);
        AbstractLog destLog = this.destCluster().leaderLog(tp);
        Assertions.assertEquals((long)0L, (long)destLog.logStartOffset());
        this.consume(this.destCluster(), this.consume$default$2());
        ClusterLinkTestHarness qual$4 = this.destCluster();
        String x$14 = this.linkName();
        Map x$15 = (Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp()), (Object)new StringBuilder(13).append(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(",")).append(",").append("retention.ms").toString())}));
        scala.collection.immutable.Seq x$16 = (scala.collection.immutable.Seq)Seq$.MODULE$.empty();
        scala.collection.immutable.Set<String> x$17 = qual$4.alterClusterLink$default$4();
        boolean x$18 = qual$4.alterClusterLink$default$5();
        qual$4.alterClusterLink(x$14, (Map<String, String>)x$15, (Seq<KafkaBroker>)x$16, x$17, x$18);
        this.produceToSourceAndWaitForMirror(10);
        this.destCluster().waitForStartOffset(tp, deletedOffset);
        this.produceToSourceAndWaitForMirror(10);
        deletedOffset = this.producedRecords().size();
        sourceAdmin.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset((long)deletedOffset))).all().get(15L, TimeUnit.SECONDS);
        this.destCluster().waitForStartOffset(tp, deletedOffset);
        this.waitUntilMirrorStartOffsetsAreCleared(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), this.waitUntilMirrorStartOffsetsAreCleared$default$2());
        ClusterLinkTestHarness qual$5 = this.destCluster();
        String x$19 = new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString();
        String x$20 = this.linkName();
        int x$21 = this.numPartitions();
        boolean x$22 = qual$5.unlinkTopic$default$3();
        boolean x$23 = qual$5.unlinkTopic$default$4();
        boolean x$24 = qual$5.unlinkTopic$default$5();
        qual$5.unlinkTopic(x$19, x$20, x$22, x$23, x$24, x$21);
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMaxMessageSize(String quorum, boolean coordinator) {
        int numRecords = 20;
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        ClusterLinkTestHarness qual$2 = this.sourceCluster();
        ByteArraySerializer x$7 = qual$2.createProducer$default$1();
        ByteArraySerializer x$8 = qual$2.createProducer$default$2();
        Properties x$9 = qual$2.createProducer$default$3();
        KafkaProducer producer = qual$2.createProducer(x$7, x$8, x$9);
        this.produceRecords(producer, this.topic(), numRecords, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        Properties linkProps = this.destLinkProps(this.destLinkProps$default$1());
        Uuid linkId = this.createClusterLink(this.linkName(), linkProps, this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$10 = this.topic();
        short x$11 = this.replicationFactor();
        String x$12 = this.linkName();
        Map<String, String> x$13 = qual$3.linkTopic$default$4();
        String x$14 = qual$3.linkTopic$default$5();
        qual$3.linkTopic(x$10, x$11, x$12, x$13, x$14);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.alterClusterLink(this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.MaxMessageSizeProp()), (Object)"1000")}))));
        this.produceRecords(producer, this.topic(), numRecords, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        byte[] value = TestUtils.randomBytes((int)1100);
        String x$16 = this.topic();
        Some x$18 = new Some((Object)value);
        Function1<Object, String> x$19 = this.produceRecords$default$4();
        Option<Object> x$20 = this.produceRecords$default$6();
        this.produceRecords(producer, x$16, 1, x$19, (Option<byte[]>)x$18, x$20);
        ClusterLinkTestHarness qual$4 = this.destCluster();
        Properties x$21 = qual$4.createConfluentAdminClient$default$1();
        this.waitForFailure(qual$4.createConfluentAdminClient(x$21), FailureType$.MODULE$.RecordTooLarge(), this.waitForFailure$default$3());
        this.alterClusterLink(this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.MaxMessageSizeProp()), (Object)"10000")}))));
        this.destCluster().alterMirrors(this.topic(), AlterMirrorOp.REPAIR);
        long l = 15000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l2 = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testMaxMessageSize$1(this);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l2).append(", and then retrying.").toString();
                    Object var55_37 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l2);
                l2 += package$.MODULE$.min(l2, 1000L);
                continue;
            }
            break;
        }
        Object var43_32 = null;
        Object var48_36 = null;
        ClusterLinkRepairMirrors repairMirrors = this.repairMirrorsTask(this.linkName(), linkId);
        long l3 = 15000L;
        TestUtils$ retry_this2 = TestUtils$.MODULE$;
        long l4 = 1L;
        long retry_startTime2 = System.currentTimeMillis();
        while (true) {
            try {
                repairMirrors.containsTask(this.topic());
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime2 > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this2.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l4).append(", and then retrying.").toString();
                    Object var56_45 = null;
                    retry_this2.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this2, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l4);
                l4 += package$.MODULE$.min(l4, 1000L);
                continue;
            }
            break;
        }
        Object var49_40 = null;
        Object var54_44 = null;
        ClusterLinkTestHarness qual$5 = this.destCluster();
        Properties x$22 = qual$5.createConfluentAdminClient$default$1();
        ConfluentAdmin destAdmin = qual$5.createConfluentAdminClient(x$22);
        this.waitForMirrorState(destAdmin, this.topic(), MirrorTopicDescription.State.ACTIVE);
        String x$24 = this.topic();
        Some x$26 = new Some((Object)value);
        Function1<Object, String> x$27 = this.produceRecords$default$4();
        Option<Object> x$28 = this.produceRecords$default$6();
        this.produceRecords(producer, x$24, 1, x$27, (Option<byte[]>)x$26, x$28);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.destCluster().deleteTopic(this.topic(), true);
        this.alterClusterLink(this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.MaxMessageSizeProp()), (Object)"1000")}))));
        this.destCluster().linkTopic(this.topic(), this.topic(), this.replicationFactor(), this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.empty()), (Option<OffsetSpec>)new Some((Object)new OffsetSpec.LatestSpec()));
        this.producedRecords().clear();
        this.produceRecords(producer, this.topic(), numRecords, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        this.waitUntilMirrorStartOffsetsAreCleared(this.topic(), this.waitUntilMirrorStartOffsetsAreCleared$default$2());
        this.verifyMirror(this.topic(), this.verifyMirror$default$2(), this.verifyMirror$default$3(), this.verifyMirror$default$4());
        ClusterLinkTestHarness qual$6 = this.destCluster();
        String x$29 = this.linkName();
        boolean x$30 = qual$6.deleteClusterLink$default$2();
        Seq<KafkaBroker> x$31 = qual$6.deleteClusterLink$default$3();
        qual$6.deleteClusterLink(x$29, x$30, x$31);
        producer.close(Duration.ZERO);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testDestReadOnly(String quorum, boolean coordinator) {
        if (quorum.equals("kraft")) {
            Assumptions.assumeFalse((boolean)this.useSourceInitiatedLink());
        }
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        Uuid linkId = this.createClusterLink(this.linkName(), this.destLinkProps((Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.max.age.ms"), (Object)"10000")})))), this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        this.produceToSourceCluster(4);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        ClusterLinkTestHarness qual$3 = this.destCluster();
        ByteArraySerializer x$12 = qual$3.createProducer$default$1();
        ByteArraySerializer x$13 = qual$3.createProducer$default$2();
        Properties x$14 = qual$3.createProducer$default$3();
        KafkaProducer destProducer = qual$3.createProducer(x$12, x$13, x$14);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> destProducer.send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(0L), (Object)"key".getBytes(), (Object)"value".getBytes())).get(15L, TimeUnit.SECONDS));
        Assertions.assertTrue((boolean)(e.getCause() instanceof InvalidRequestException));
        Assertions.assertTrue((boolean)e.getMessage().contains("Cannot append records to read-only mirror topic"), (String)new StringBuilder(17).append("Unexpected error ").append(e.getMessage()).toString());
        Assertions.assertThrows(InvalidPartitionsException.class, () -> this.destCluster().createPartitions(this.topic(), 8));
        this.destCluster().withAdmin((Function1 & Serializable)admin -> {
            ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testDestReadOnly$3(this, admin);
            return BoxedUnit.UNIT;
        });
        this.produceToSourceCluster(4);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.verifyBasicLinkMetrics(linkId, this.verifyBasicLinkMetrics$default$2(), this.verifyBasicLinkMetrics$default$3());
        ClusterLinkTestHarness qual$4 = this.destCluster();
        String x$15 = this.topic();
        String x$16 = this.linkName();
        boolean x$17 = qual$4.unlinkTopic$default$3();
        boolean x$18 = qual$4.unlinkTopic$default$4();
        boolean x$19 = qual$4.unlinkTopic$default$5();
        int x$20 = qual$4.unlinkTopic$default$6();
        qual$4.unlinkTopic(x$15, x$16, x$17, x$18, x$19, x$20);
        this.destCluster().verifyTopicWritable(this.topic(), this.numPartitions());
        this.produceRecords(destProducer, this.topic(), 10, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        ClusterLinkTestHarness qual$5 = this.destCluster();
        String x$21 = this.linkName();
        boolean x$22 = qual$5.deleteClusterLink$default$2();
        Seq<KafkaBroker> x$23 = qual$5.deleteClusterLink$default$3();
        qual$5.deleteClusterLink(x$21, x$22, x$23);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testThrottlePartitions(String quorum, boolean coordinator) {
        this.verifyThrottlePartitions(false);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testThrottlePartitionsWithSourceLeaderChanges(String quorum, boolean coordinator) {
        this.verifyThrottlePartitions(true);
    }

    /*
     * WARNING - void declaration
     */
    public void verifyThrottlePartitions(boolean shutdownSourceBroker) {
        Assertions.assertEquals((int)2, (int)this.sourceCluster().brokerCount());
        this.numPartitions_$eq(20);
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        Properties linkProps = this.destLinkProps(this.destLinkProps$default$1());
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherMaxLaggingPartitionsProp(), "1");
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherEnforceMaxLaggingPartitionMsProp(), "100");
        linkProps.setProperty("metadata.max.age.ms", "100");
        linkProps.setProperty(KafkaConfig$.MODULE$.ReplicaFetchConnectionsModeProp(), FetchConnectionsMode.Combined$.MODULE$.value());
        Uuid linkId = this.createClusterLink(this.linkName(), linkProps, this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$1(this, linkId)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Unexpected number of linked partitions");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        scala.collection.immutable.Set linkedPartitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$4(this, BoxesRunTime.unboxToInt((Object)i))).toSet();
        int shutdownSourceBrokerId = -1;
        if (shutdownSourceBroker) {
            void var20_23;
            Tuple2<Object, Object> tuple2 = this.sourceCluster().shutdownLeader(new TopicPartition(this.topic(), 0));
            if (tuple2 == null) {
                throw new MatchError(null);
            }
            int shutdownBrokerId = tuple2._1$mcI$sp();
            shutdownSourceBrokerId = var20_23;
            long l2 = 100L;
            long waitUntilTrue_waitTimeMs2 = 15000L;
            long waitUntilTrue_startTime2 = System.currentTimeMillis();
            while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$5(this, linkId)) {
                void waitUntilTrue_pause;
                if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs2) {
                    Assertions.fail((String)"Each destination broker should have 1 fetcher");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs2), (long)waitUntilTrue_pause));
            }
        }
        this.destCluster().brokers().foreach((Function1 & Serializable)broker -> {
            linkedPartitions.foreach((Function1 & Serializable)tp -> {
                ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$9(broker, linkId$3, tp);
                return BoxedUnit.UNIT;
            });
            return BoxedUnit.UNIT;
        });
        long l3 = 100L;
        long waitUntilTrue_waitTimeMs3 = 15000L;
        long waitUntilTrue_startTime3 = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$10(this, linkId)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime3 + waitUntilTrue_waitTimeMs3) {
                Assertions.fail((String)"Partitions are not throttled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs3), (long)waitUntilTrue_pause));
        }
        int iteration = 0;
        ObjectRef unThrottledPartitions = ObjectRef.create(null);
        int numRecords = this.numPartitions() * 3;
        scala.collection.immutable.Seq brokerThrottledPartitions = ((IterableOnceOps)this.destCluster().brokers().map((Function1 & Serializable)broker -> (scala.collection.mutable.Set)TestUtils.fieldValue((Object)((ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId).get()), ClusterLinkFetcherManager.class, (String)"throttledPartitions"))).toSeq();
        do {
            unThrottledPartitions.elem = (scala.collection.immutable.Set)linkedPartitions.filter((Function1 & Serializable)tp -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$14(brokerThrottledPartitions, tp)));
            if (iteration == 0) {
                if (shutdownSourceBrokerId >= 0) {
                    this.sourceCluster().startBroker(shutdownSourceBrokerId);
                    long l4 = 15000L;
                    TestUtils$ retry_this = TestUtils$.MODULE$;
                    long l5 = 1L;
                    long retry_startTime = System.currentTimeMillis();
                    while (true) {
                        try {
                            try {
                                this.sourceCluster().changeToPreferredLeader(this.topic(), (scala.collection.immutable.Set<TopicPartition>)((scala.collection.immutable.Set)unThrottledPartitions.elem));
                            }
                            catch (Throwable throwable) {
                                Assertions.fail((String)"Failed in changing to preferred leader");
                            }
                        }
                        catch (AssertionError retry_e) {
                            void retry_maxWaitMs;
                            if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                                throw retry_e;
                            }
                            if (retry_this.logger().underlying().isInfoEnabled()) {
                                String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l5).append(", and then retrying.").toString();
                                Object var49_39 = null;
                                retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                            }
                            Thread.sleep(l5);
                            l5 += package$.MODULE$.min(l5, 1000L);
                            continue;
                        }
                        break;
                    }
                    Object var43_34 = null;
                    Object var48_37 = null;
                }
                this.produceToSourceCluster(numRecords);
            }
            this.destCluster().brokers().map((Function1 & Serializable)x$18 -> {
                ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$17(linkId, unThrottledPartitions, x$18);
                return BoxedUnit.UNIT;
            });
            Thread.sleep(200L);
        } while (((scala.collection.immutable.Set)unThrottledPartitions.elem).size() < this.numPartitions() && ++iteration < this.numPartitions());
        Assertions.assertEquals((int)this.numPartitions(), (int)((scala.collection.immutable.Set)unThrottledPartitions.elem).size());
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testAutoTuneFetchers(String quorum, boolean coordinator) {
        this.numPartitions_$eq(10);
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        Properties linkProps = this.destLinkProps(this.destLinkProps$default$1());
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherMaxLaggingPartitionsProp(), "-1");
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherEnforceMaxLaggingPartitionMsProp(), "100");
        linkProps.setProperty("metadata.max.age.ms", "100");
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherAutoTuneEnableProp(), "true");
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp(), "5");
        linkProps.setProperty(KafkaConfig$.MODULE$.ReplicaFetchConnectionsModeProp(), FetchConnectionsMode.Combined$.MODULE$.value());
        Uuid linkId = this.createClusterLink(this.linkName(), linkProps, this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$1(this, linkId)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Unexpected number of fetchers' partitions");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        HashMap brokerFetcherCount = (HashMap)HashMap$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$);
        this.destCluster().brokers().foreach((Function1 & Serializable)broker -> {
            double defaultPoolTargetFetcherCount = this.kafkaBrokerMetricValue((KafkaBroker)broker, "link-target-fetcher-count", "cluster-link-metrics", (Option<String>)new Some((Object)this.linkName()), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pool"), (Object)"Default")}))), this.kafkaBrokerMetricValue$default$6(), this.kafkaBrokerMetricValue$default$7());
            Assertions.assertEquals((double)1.0, (double)defaultPoolTargetFetcherCount, (String)new StringBuilder(69).append("broker ").append(broker.config().brokerId()).append(" Default pool target fetcher count initial value is unexpected").toString());
            double inSyncPoolTargetFetcherCount = this.kafkaBrokerMetricValue((KafkaBroker)broker, "link-target-fetcher-count", "cluster-link-metrics", (Option<String>)new Some((Object)this.linkName()), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pool"), (Object)"InSync")}))), this.kafkaBrokerMetricValue$default$6(), this.kafkaBrokerMetricValue$default$7());
            Assertions.assertEquals((double)1.0, (double)inSyncPoolTargetFetcherCount, (String)new StringBuilder(63).append("broker ").append(broker.config().brokerId()).append(" InSync target fetcher count initial value is unexpected").toString());
            int fetcherCount = ((ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId).get()).fetcherCount();
            Assertions.assertTrue((fetcherCount > 0 ? 1 : 0) != 0, (String)new StringBuilder(21).append("broker ").append(broker.config().brokerId()).append(" has ").append(fetcherCount).append(" fetchers").toString());
            return (HashMap)brokerFetcherCount.$plus$eq((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)broker.config().brokerId())), (Object)BoxesRunTime.boxToInteger((int)fetcherCount)));
        });
        scala.collection.immutable.Set linkedPartitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$6(this, BoxesRunTime.unboxToInt((Object)i))).toSet();
        this.destCluster().brokers().foreach((Function1 & Serializable)broker -> {
            linkedPartitions.foreach((Function1 & Serializable)tp -> {
                ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$8(broker, linkId$4, tp);
                return BoxedUnit.UNIT;
            });
            return BoxedUnit.UNIT;
        });
        long l2 = 100L;
        long waitUntilTrue_waitTimeMs2 = 15000L;
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$9(this, linkId, brokerFetcherCount)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs2) {
                Assertions.fail((String)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$11(brokerFetcherCount));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs2), (long)waitUntilTrue_pause));
        }
        this.destCluster().brokers().foreach((Function1 & Serializable)broker -> {
            ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$12(this, broker);
            return BoxedUnit.UNIT;
        });
        this.destCluster().brokers().foreach((Function1 & Serializable)x$19 -> {
            ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$13(linkId, linkedPartitions, x$19);
            return BoxedUnit.UNIT;
        });
        long l3 = 100L;
        long waitUntilTrue_waitTimeMs3 = 15000L;
        long waitUntilTrue_startTime3 = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$14(this, linkId, brokerFetcherCount)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime3 + waitUntilTrue_waitTimeMs3) {
                Assertions.fail((String)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$16(brokerFetcherCount));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs3), (long)waitUntilTrue_pause));
        }
        this.destCluster().brokers().foreach((Function1 & Serializable)broker -> {
            ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$17(this, broker);
            return BoxedUnit.UNIT;
        });
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$12 = this.linkName();
        Map x$13 = (Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.LinkFetcherAutoTuneEnableProp()), (Object)"false")}));
        Seq<KafkaBroker> x$14 = qual$3.alterClusterLink$default$3();
        scala.collection.immutable.Set<String> x$15 = qual$3.alterClusterLink$default$4();
        boolean x$16 = qual$3.alterClusterLink$default$5();
        qual$3.alterClusterLink(x$12, (Map<String, String>)x$13, x$14, x$15, x$16);
        long l4 = 100L;
        long waitUntilTrue_waitTimeMs4 = 15000L;
        long waitUntilTrue_startTime4 = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$18(this, linkId, brokerFetcherCount)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime4 + waitUntilTrue_waitTimeMs4) {
                Assertions.fail((String)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$20(brokerFetcherCount));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs4), (long)waitUntilTrue_pause));
        }
        this.destCluster().brokers().foreach((Function1 & Serializable)broker -> {
            ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$21(this, broker);
            return BoxedUnit.UNIT;
        });
        long l5 = 100L;
        long waitUntilTrue_waitTimeMs5 = 15000L;
        long waitUntilTrue_startTime5 = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$22(this)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime5 + waitUntilTrue_waitTimeMs5) {
                Assertions.fail((String)"InSync target fetcher count doesn't get reset to 5 when auto tuning is disabled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs5), (long)waitUntilTrue_pause));
        }
        ClusterLinkTestHarness qual$4 = this.destCluster();
        String x$17 = this.linkName();
        Map x$18 = (Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.LinkFetcherAutoTuneEnableProp()), (Object)"true")}));
        Seq<KafkaBroker> x$192 = qual$4.alterClusterLink$default$3();
        scala.collection.immutable.Set<String> x$20 = qual$4.alterClusterLink$default$4();
        boolean x$21 = qual$4.alterClusterLink$default$5();
        qual$4.alterClusterLink(x$17, (Map<String, String>)x$18, x$192, x$20, x$21);
        long l6 = 100L;
        long waitUntilTrue_waitTimeMs6 = 15000L;
        long waitUntilTrue_startTime6 = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$25(this)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime6 + waitUntilTrue_waitTimeMs6) {
                Assertions.fail((String)"InSync pool target fetcher count does not reset to 1 after enabling auto tuning");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs6), (long)waitUntilTrue_pause));
        }
    }

    private Seq<Object> logOffsets(ClusterLinkTestHarness cluster, String topic, Function1<AbstractLog, Object> logOffset, int numPartitions) {
        return (Seq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$1(topic, BoxesRunTime.unboxToInt((Object)i))).map((Function1 & Serializable)tp -> BoxesRunTime.boxToLong((long)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$2(cluster, logOffset, tp)));
    }

    private int logOffsets$default$4() {
        return this.numPartitions();
    }

    private void shutdownClearMirrorStartOffsets(Uuid linkId) {
        this.destCluster().aliveBrokers().foreach((Function1 & Serializable)broker -> {
            ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$shutdownClearMirrorStartOffsets$1(linkId, broker);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ TopicPartition $anonfun$testFetcherThreads$4(ClusterLinkDataPlaneMirroringIntegrationTest $this, int i) {
        return new TopicPartition($this.topic(), i);
    }

    public static final /* synthetic */ boolean $anonfun$testFetcherThreads$8(Tuple2 x0$1) {
        if (x0$1 != null) {
            FetcherPool fetcherPool = ((FetcherTag)x0$1._1()).fetcherPool();
            FetcherPool.Default$ default$ = FetcherPool.Default$.MODULE$;
            return fetcherPool != null && fetcherPool.equals(default$);
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ int $anonfun$testFetcherThreads$9(Tuple2 x0$2) {
        if (x0$2 != null) {
            return ((ClusterLinkFetcher)x0$2._2()).partitionCount();
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ boolean $anonfun$testFetcherThreads$7(Uuid linkId$1, scala.collection.mutable.Map brokerDefaultFetcherAndPartitionCount$1, KafkaBroker broker) {
        ClusterLinkFetcherManager fetcherManager = (ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId$1).get();
        int fetcherCount = fetcherManager.fetcherCountInPool((FetcherPool)FetcherPool.Default$.MODULE$);
        int partitionCount = BoxesRunTime.unboxToInt((Object)((IterableOnceOps)((StrictOptimizedIterableOps)((HashMap)TestUtils.fieldValue((Object)fetcherManager, AbstractFetcherManager.class, (String)"fetcherThreadMap")).filter((Function1 & Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$8(x0$1)))).map((Function1 & Serializable)x0$2 -> BoxesRunTime.boxToInteger((int)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$9(x0$2)))).sum((Numeric)Numeric.IntIsIntegral$.MODULE$));
        brokerDefaultFetcherAndPartitionCount$1.put((Object)BoxesRunTime.boxToInteger((int)broker.config().brokerId()), (Object)new Tuple2.mcII.sp(fetcherCount, partitionCount));
        return fetcherCount == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testFetcherThreads$6(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$1, scala.collection.mutable.Map brokerDefaultFetcherAndPartitionCount$1, int numRecords$1) {
        boolean migratedToInSyncPool = $this.destCluster().brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$7(linkId$1, brokerDefaultFetcherAndPartitionCount$1, broker)));
        if (!migratedToInSyncPool) {
            $this.produceToSourceCluster(numRecords$1);
            $this.waitForMirror($this.waitForMirror$default$1(), $this.waitForMirror$default$2());
        }
        return migratedToInSyncPool;
    }

    public static final /* synthetic */ String $anonfun$testFetcherThreads$10(scala.collection.mutable.Map brokerDefaultFetcherAndPartitionCount$1) {
        return new StringBuilder(35).append("Fetchers are still in Default pool ").append(brokerDefaultFetcherAndPartitionCount$1).toString();
    }

    public static final /* synthetic */ boolean $anonfun$testFetcherThreads$12(Uuid linkId$1, KafkaBroker broker) {
        return ((ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId$1).get()).fetcherCountInPool((FetcherPool)FetcherPool.InSync$.MODULE$) > 0;
    }

    public static final /* synthetic */ boolean $anonfun$testFetcherThreads$11(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$1) {
        return $this.destCluster().brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$12(linkId$1, broker)));
    }

    public static final /* synthetic */ String $anonfun$testFetcherThreads$13() {
        return "No fetcher in InSync pool";
    }

    public static final /* synthetic */ void $anonfun$testFetcherThreads$15(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$1, Product newMode$1, int maxFetcherThreads$1) {
        $this.verifyFetcherThreads(linkId$1, (FetcherThreadPoolMode)newMode$1, 2, maxFetcherThreads$1);
    }

    public static final /* synthetic */ void $anonfun$testFetcherThreads$16(Product newMode$1, Uuid linkId$1, KafkaBroker broker) {
        Product product = newMode$1;
        FetcherThreadPoolMode.Link$ link$ = FetcherThreadPoolMode.Link$.MODULE$;
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((product != null && product.equals(link$) ? 1 : 0) != 0), (Object)BoxesRunTime.boxToBoolean((boolean)((ClusterLinkConfig)broker.clusterLinkManager().linkConfig(linkId$1).get()).useSharedFetcherThread()));
    }

    /*
     * WARNING - void declaration
     */
    private final Seq verifyMirrorWithStartOffsetSpec$1(String topic, Option offsetSpec, boolean olderRecordsSkipped, boolean verifyStartOffsetsCleared, boolean stopMirror, Consumer consumer$1) {
        void logOffsets_logOffset;
        void logOffsets_numPartitions;
        void logOffsets_logOffset2;
        void logOffsets_numPartitions2;
        void logOffsets_logOffset3;
        void logOffsets_numPartitions3;
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        ByteArraySerializer x$12 = qual$1.createProducer$default$1();
        ByteArraySerializer x$22 = qual$1.createProducer$default$2();
        Properties x$32 = qual$1.createProducer$default$3();
        KafkaProducer producer = qual$1.createProducer(x$12, x$22, x$32);
        String x$52 = this.partitions$default$1();
        int x$6 = this.partitions$default$3();
        Seq<TopicPartition> mirrorPartitions = this.partitions(x$52, topic, x$6);
        ClusterLinkTestHarness qual$2 = this.sourceCluster();
        int x$8 = this.numPartitions();
        short x$9 = this.replicationFactor();
        Properties x$10 = qual$2.createTopic$default$4();
        ListenerName x$11 = qual$2.createTopic$default$5();
        Properties x$122 = qual$2.createTopic$default$6();
        qual$2.createTopic(topic, x$8, x$9, x$10, x$11, x$122);
        this.produceRecords(producer, topic, 25, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        int n = this.numPartitions();
        Function1 & Serializable intersect = (Function1 & Serializable)x$1 -> BoxesRunTime.boxToLong((long)x$1.logEndOffset());
        ClusterLinkTestHarness logOffsets_cluster = this.sourceCluster();
        IndexedSeq logOffsets_partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), (int)logOffsets_numPartitions3).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        Object var28_23 = null;
        intersect = null;
        Object var31_24 = null;
        Seq offsetsBeforeMirrorCreate = (Seq)logOffsets_partitions.map(arg_0 -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$2$adapted(logOffsets_cluster, (Function1)logOffsets_logOffset3, arg_0));
        String mirrorTopic = new StringBuilder(0).append(this.clusterLinkPrefix()).append(topic).toString();
        this.destCluster().linkTopic(mirrorTopic, topic, this.replicationFactor(), this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.empty()), (Option<OffsetSpec>)offsetSpec);
        this.waitForMirrorPartitions(mirrorPartitions, this.waitForMirrorPartitions$default$2(), this.waitForMirrorPartitions$default$3(), this.waitForMirrorPartitions$default$4(), this.waitForMirrorPartitions$default$5());
        this.produceRecords(producer, topic, 25, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        this.waitForMirrorPartitions(mirrorPartitions, this.waitForMirrorPartitions$default$2(), this.waitForMirrorPartitions$default$3(), this.waitForMirrorPartitions$default$4(), this.waitForMirrorPartitions$default$5());
        int n2 = this.numPartitions();
        Function1 & Serializable intersect2 = (Function1 & Serializable)x$2 -> BoxesRunTime.boxToLong((long)x$2.logEndOffset());
        ClusterLinkTestHarness logOffsets_cluster2 = this.sourceCluster();
        Object var32_29 = null;
        intersect2 = null;
        int n3 = this.numPartitions();
        Function1 & Serializable intersect3 = (Function1 & Serializable)x$3 -> BoxesRunTime.boxToLong((long)x$3.logEndOffset());
        ClusterLinkTestHarness logOffsets_cluster3 = this.destCluster();
        Object var35_32 = null;
        intersect3 = null;
        Assertions.assertEquals((Object)((Seq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), (int)logOffsets_numPartitions2).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$1(topic, BoxesRunTime.unboxToInt((Object)i))).map(arg_0 -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$2$adapted(logOffsets_cluster2, (Function1)logOffsets_logOffset2, arg_0))), (Object)((Seq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), (int)logOffsets_numPartitions).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$1(topic, BoxesRunTime.unboxToInt((Object)i))).map(arg_0 -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$2$adapted(logOffsets_cluster3, (Function1)logOffsets_logOffset, arg_0))));
        if (olderRecordsSkipped) {
            void logOffsets_logOffset4;
            void logOffsets_numPartitions4;
            int n4 = this.numPartitions();
            Function1 & Serializable intersect4 = (Function1 & Serializable)x$4 -> BoxesRunTime.boxToLong((long)x$4.logStartOffset());
            ClusterLinkTestHarness logOffsets_cluster4 = this.destCluster();
            Object var38_35 = null;
            intersect4 = null;
            Assertions.assertEquals((Object)offsetsBeforeMirrorCreate, (Object)((Seq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), (int)logOffsets_numPartitions4).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$1(topic, BoxesRunTime.unboxToInt((Object)i))).map(arg_0 -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$2$adapted(logOffsets_cluster4, (Function1)logOffsets_logOffset4, arg_0))));
            this.producedRecords().remove(0, 25);
        } else {
            void logOffsets_logOffset5;
            void logOffsets_numPartitions5;
            int n5 = this.numPartitions();
            Function1 & Serializable intersect5 = (Function1 & Serializable)x$5 -> BoxesRunTime.boxToLong((long)x$5.logStartOffset());
            ClusterLinkTestHarness logOffsets_cluster5 = this.destCluster();
            Object var41_38 = null;
            intersect5 = null;
            ((Seq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), (int)logOffsets_numPartitions5).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$1(topic, BoxesRunTime.unboxToInt((Object)i))).map(arg_0 -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$2$adapted(logOffsets_cluster5, (Function1)logOffsets_logOffset5, arg_0))).foreach((Function1)(JFunction1.mcVJ.sp & Serializable)offset -> Assertions.assertEquals((long)0L, (long)offset));
        }
        consumer$1.assign((Collection)CollectionConverters$.MODULE$.SeqHasAsJava(this.partitions(this.clusterLinkPrefix(), topic, this.partitions$default$3())).asJava());
        this.consumeRecords((Consumer<byte[], byte[]>)consumer$1, this.clusterLinkPrefix(), topic);
        if (verifyStartOffsetsCleared) {
            this.waitUntilMirrorStartOffsetsAreCleared(mirrorTopic, this.waitUntilMirrorStartOffsetsAreCleared$default$2());
        }
        if (stopMirror) {
            ClusterLinkTestHarness qual$3 = this.destCluster();
            String x$14 = this.linkName();
            boolean x$16 = qual$3.unlinkTopic$default$3();
            boolean x$17 = qual$3.unlinkTopic$default$5();
            int x$18 = qual$3.unlinkTopic$default$6();
            qual$3.unlinkTopic(mirrorTopic, x$14, x$16, verifyStartOffsetsCleared, x$17, x$18);
        }
        this.producedRecords().clear();
        producer.close();
        return offsetsBeforeMirrorCreate;
    }

    private static final boolean verifyMirrorWithStartOffsetSpec$default$4$1() {
        return false;
    }

    private static final boolean verifyMirrorWithStartOffsetSpec$default$5$1() {
        return true;
    }

    private static final String nextTopic$1(IntRef topicIndex$1) {
        ++topicIndex$1.elem;
        return new StringBuilder(5).append("topic").append(topicIndex$1.elem).toString();
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorStartTimestamp$5(String mirrorTopic$1, int newNumPartitions$1, KafkaBroker x$8) {
        return x$8.metadataCache().numPartitions(mirrorTopic$1).contains((Object)BoxesRunTime.boxToInteger((int)newNumPartitions$1));
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorStartTimestamp$4(ClusterLinkDataPlaneMirroringIntegrationTest $this, String mirrorTopic$1, int newNumPartitions$1) {
        return $this.destCluster().brokers().forall((Function1 & Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testMirrorStartTimestamp$5(mirrorTopic$1, newNumPartitions$1, x$8)));
    }

    public static final /* synthetic */ String $anonfun$testMirrorStartTimestamp$6() {
        return "Partitions not added to mirror topic";
    }

    /*
     * WARNING - void declaration
     */
    private final void verifyMirrorWithStartOffsetSpec$2(String topic, Option offsetSpec, Seq batchAgeMs, int batchesSkipped, int batchSize$1, long currentTimeMs$1, int recordsPerPartitionPerBatch$1, Consumer consumer$2) {
        void logOffsets_logOffset;
        void logOffsets_logOffset2;
        void logOffsets_numPartitions;
        void logOffsets_logOffset3;
        void logOffsets_numPartitions2;
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        ByteArraySerializer x$1 = qual$1.createProducer$default$1();
        ByteArraySerializer x$2 = qual$1.createProducer$default$2();
        Properties x$3 = qual$1.createProducer$default$3();
        KafkaProducer producer = qual$1.createProducer(x$1, x$2, x$3);
        ClusterLinkTestHarness qual$2 = this.sourceCluster();
        int x$5 = this.numPartitions();
        short x$62 = this.replicationFactor();
        Properties x$72 = qual$2.createTopic$default$4();
        ListenerName x$8 = qual$2.createTopic$default$5();
        Properties x$92 = qual$2.createTopic$default$6();
        qual$2.createTopic(topic, x$5, x$62, x$72, x$8, x$92);
        batchAgeMs.foreach((Function1)(JFunction1.mcVI.sp & Serializable)age -> {
            Some x$13 = new Some((Object)BoxesRunTime.boxToLong((long)(currentTimeMs$1 - (long)age)));
            Function1<Object, String> x$14 = this.produceRecords$default$4();
            Option<byte[]> x$15 = this.produceRecords$default$5();
            this.produceRecords((KafkaProducer<byte[], byte[]>)producer, topic, batchSize$1, x$14, x$15, (Option<Object>)x$13);
        });
        String mirrorTopic = new StringBuilder(0).append(this.clusterLinkPrefix()).append(topic).toString();
        String x$17 = this.partitions$default$1();
        int x$18 = this.partitions$default$3();
        Seq<TopicPartition> mirrorPartitions = this.partitions(x$17, topic, x$18);
        this.destCluster().linkTopic(mirrorTopic, topic, this.replicationFactor(), this.linkName(), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.empty()), (Option<OffsetSpec>)offsetSpec);
        this.waitForMirrorPartitions(mirrorPartitions, this.waitForMirrorPartitions$default$2(), this.waitForMirrorPartitions$default$3(), this.waitForMirrorPartitions$default$4(), this.waitForMirrorPartitions$default$5());
        this.produceRecords(producer, topic, batchSize$1, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        this.waitForMirrorPartitions(mirrorPartitions, this.waitForMirrorPartitions$default$2(), this.waitForMirrorPartitions$default$3(), this.waitForMirrorPartitions$default$4(), this.waitForMirrorPartitions$default$5());
        int n = this.numPartitions();
        Function1 & Serializable intersect = (Function1 & Serializable)x$6 -> BoxesRunTime.boxToLong((long)x$6.logEndOffset());
        ClusterLinkTestHarness logOffsets_cluster = this.sourceCluster();
        Object var35_26 = null;
        intersect = null;
        int n2 = this.numPartitions();
        Function1 & Serializable intersect2 = (Function1 & Serializable)x$7 -> BoxesRunTime.boxToLong((long)x$7.logEndOffset());
        ClusterLinkTestHarness logOffsets_cluster2 = this.destCluster();
        Object var38_29 = null;
        intersect2 = null;
        Assertions.assertEquals((Object)((Seq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), (int)logOffsets_numPartitions2).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$1(topic, BoxesRunTime.unboxToInt((Object)i))).map(arg_0 -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$2$adapted(logOffsets_cluster, (Function1)logOffsets_logOffset3, arg_0))), (Object)((Seq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), (int)logOffsets_numPartitions).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$1(topic, BoxesRunTime.unboxToInt((Object)i))).map(arg_0 -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$2$adapted(logOffsets_cluster2, (Function1)logOffsets_logOffset2, arg_0))));
        if (batchesSkipped > 0) {
            this.producedRecords().remove(0, batchesSkipped * batchSize$1);
        }
        int newNumPartitions = this.numPartitions() + 1;
        this.sourceCluster().createPartitions(topic, newNumPartitions);
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testMirrorStartTimestamp$4(this, mirrorTopic, newNumPartitions)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Partitions not added to mirror topic");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Some x$22 = new Some((Object)BoxesRunTime.boxToLong((long)currentTimeMs$1));
        Function1<Object, String> x$23 = this.produceRecords$default$4();
        Option<byte[]> x$24 = this.produceRecords$default$5();
        this.produceRecords(producer, topic, batchSize$1, x$23, x$24, (Option<Object>)x$22);
        this.waitForMirrorPartitions(mirrorPartitions, this.waitForMirrorPartitions$default$2(), this.waitForMirrorPartitions$default$3(), this.waitForMirrorPartitions$default$4(), this.waitForMirrorPartitions$default$5());
        Function1 & Serializable intersect3 = (Function1 & Serializable)x$10 -> BoxesRunTime.boxToLong((long)x$10.logStartOffset());
        ClusterLinkTestHarness logOffsets_cluster3 = this.destCluster();
        Object var47_38 = null;
        intersect3 = null;
        Assertions.assertEquals((Object)((IndexedSeq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1)(JFunction1.mcII.sp & Serializable)x$9 -> batchesSkipped * recordsPerPartitionPerBatch$1).$plus$plus((IterableOnce)scala.collection.Seq$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0})))), (Object)((Seq)RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), newNumPartitions).map((Function1 & Serializable)i -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$1(topic, BoxesRunTime.unboxToInt((Object)i))).map(arg_0 -> ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$logOffsets$2$adapted(logOffsets_cluster3, (Function1)logOffsets_logOffset, arg_0))));
        consumer$2.assign((Collection)CollectionConverters$.MODULE$.SeqHasAsJava(this.partitions(this.clusterLinkPrefix(), topic, newNumPartitions)).asJava());
        this.consumeRecords((Consumer<byte[], byte[]>)consumer$2, this.clusterLinkPrefix(), topic);
        this.waitUntilMirrorStartOffsetsAreCleared(mirrorTopic, this.waitUntilMirrorStartOffsetsAreCleared$default$2());
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$26 = this.linkName();
        boolean x$27 = qual$3.unlinkTopic$default$3();
        boolean x$28 = qual$3.unlinkTopic$default$4();
        boolean x$29 = qual$3.unlinkTopic$default$5();
        int x$30 = qual$3.unlinkTopic$default$6();
        qual$3.unlinkTopic(mirrorTopic, x$26, x$27, x$28, x$29, x$30);
        this.producedRecords().clear();
        producer.close();
    }

    private static final String nextTopic$2(IntRef topicIndex$2) {
        ++topicIndex$2.elem;
        return new StringBuilder(5).append("topic").append(topicIndex$2.elem).toString();
    }

    private static final boolean usesIndependentRetention$1(KafkaBroker broker, Uuid linkId$2) {
        return ((ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId$2).get()).currentConfig().useIndependentRetention();
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithDifferentRetention$3(int x$11) {
        return (long)x$11 == 999L;
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithDifferentRetention$6(long x$12) {
        return x$12 == 30000000L;
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithDifferentRetention$7(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$2) {
        return !$this.destCluster().brokers().exists((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.usesIndependentRetention$1(broker, linkId$2)));
    }

    public static final /* synthetic */ String $anonfun$testMirrorWithDifferentRetention$9() {
        return "Retention config update not applied";
    }

    public static final /* synthetic */ boolean $anonfun$testTransactionsWithMirrorTopic$4(ClusterLinkDataPlaneMirroringIntegrationTest $this) {
        return ((SeqOps)$this.partitions($this.partitions$default$1(), $this.partitions$default$2(), $this.partitions$default$3()).flatMap((Function1 & Serializable)tp -> (Buffer)((IterableOps)$this.destCluster().brokers().flatMap((Function1 & Serializable)x$13 -> x$13.replicaManager().onlinePartition(tp))).filter((Function1 & Serializable)x$14 -> BoxesRunTime.boxToBoolean((boolean)x$14.linkedUpdatesOnly())))).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testTransactionsWithMirrorTopic$8() {
        return "Mirror not stopped";
    }

    public static final /* synthetic */ void $anonfun$testMaxMessageSize$1(ClusterLinkDataPlaneMirroringIntegrationTest $this) {
        MirrorTopicDescription topicDesc = $this.destCluster().describeMirrorTopic($this.topic(), true);
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(topicDesc.mirrorStateTransitionErrors()).asScala().isEmpty());
    }

    public static final /* synthetic */ void $anonfun$testDestReadOnly$4(ConfluentAdmin admin$1, ConfigResource resource$1, Tuple2 x0$1) {
        if (x0$1 != null) {
            AlterConfigOp alterConfigOp;
            boolean expectSuccess;
            String name = (String)x0$1._1();
            Option value = (Option)x0$1._2();
            String string = name;
            String string2 = "unclean.leader.election.enable";
            boolean bl = expectSuccess = string != null && string.equals(string2);
            if (value instanceof Some) {
                String v = (String)((Some)value).value();
                alterConfigOp = new AlterConfigOp(new ConfigEntry(name, v), AlterConfigOp.OpType.SET);
            } else if (None$.MODULE$.equals(value)) {
                alterConfigOp = new AlterConfigOp(new ConfigEntry(name, null), AlterConfigOp.OpType.DELETE);
            } else {
                throw new MatchError((Object)value);
            }
            try {
                java.util.Set<AlterConfigOp> ops = Collections.singleton(alterConfigOp);
                admin$1.incrementalAlterConfigs(CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource$1), ops)}))).asJava()).all().get();
                Assertions.assertTrue((boolean)expectSuccess);
                return;
            }
            catch (ExecutionException executionException) {
                Assertions.assertTrue((boolean)(executionException.getCause() instanceof InvalidConfigurationException));
                Assertions.assertFalse((boolean)expectSuccess);
                return;
            }
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ void $anonfun$testDestReadOnly$3(ClusterLinkDataPlaneMirroringIntegrationTest $this, ConfluentAdmin admin) {
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, $this.topic());
        try {
            admin.alterConfigs(CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)scala.package$.MODULE$.List().empty()).asJavaCollection()))}))).asJava()).all().get(20L, TimeUnit.SECONDS);
            Assertions.fail((String)"alterConfigs() on a mirror topic should fail");
        }
        catch (ExecutionException executionException) {
            Assertions.assertTrue((boolean)(executionException.getCause() instanceof InvalidRequestException));
        }
        new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"unclean.leader.election.enable"), (Object)new Some((Object)"true")), (List)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"unclean.leader.election.enable"), (Object)None$.MODULE$), (List)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cleanup.policy"), (Object)new Some((Object)"compact")), (List)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cleanup.policy"), (Object)None$.MODULE$), (List)Nil$.MODULE$)))).foreach((Function1 & Serializable)x0$1 -> {
            ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testDestReadOnly$4(admin, resource, x0$1);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ long $anonfun$verifyThrottlePartitions$2(Uuid linkId$3, long curLinkedPartitionCount, KafkaBroker broker) {
        return curLinkedPartitionCount + (long)((ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId$3).get()).mirrorPartitionCount();
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$1(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$3) {
        return (long)$this.numPartitions() == BoxesRunTime.unboxToLong((Object)$this.destCluster().brokers().foldLeft((Object)BoxesRunTime.boxToLong((long)0L), (Function2 & Serializable)(curLinkedPartitionCount, broker) -> BoxesRunTime.boxToLong((long)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$2(linkId$3, BoxesRunTime.unboxToLong((Object)curLinkedPartitionCount), broker))));
    }

    public static final /* synthetic */ String $anonfun$verifyThrottlePartitions$3() {
        return "Unexpected number of linked partitions";
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyThrottlePartitions$4(ClusterLinkDataPlaneMirroringIntegrationTest $this, int i) {
        return new TopicPartition($this.topic(), i);
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$6(Uuid linkId$3, KafkaBroker x$15) {
        return ((ClusterLinkFetcherManager)x$15.clusterLinkManager().fetcherManager(linkId$3).get()).fetcherCount() == 1;
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$5(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$3) {
        return $this.destCluster().brokers().forall((Function1 & Serializable)x$15 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$6(linkId$3, x$15)));
    }

    public static final /* synthetic */ String $anonfun$verifyThrottlePartitions$7() {
        return "Each destination broker should have 1 fetcher";
    }

    public static final /* synthetic */ void $anonfun$verifyThrottlePartitions$9(KafkaBroker broker$1, Uuid linkId$3, TopicPartition tp) {
        ((ClusterLinkFetcherManager)broker$1.clusterLinkManager().fetcherManager(linkId$3).get()).addTestLaggingPartition(tp, tp.partition() + 1);
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$11(Uuid linkId$3, KafkaBroker x$16) {
        return ((ClusterLinkFetcherManager)x$16.clusterLinkManager().fetcherManager(linkId$3).get()).throttledPartitionCount() > 0;
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$10(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$3) {
        return $this.destCluster().brokers().forall((Function1 & Serializable)x$16 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$verifyThrottlePartitions$11(linkId$3, x$16)));
    }

    public static final /* synthetic */ String $anonfun$verifyThrottlePartitions$12() {
        return "Partitions are not throttled";
    }

    public static final /* synthetic */ boolean $anonfun$verifyThrottlePartitions$14(scala.collection.immutable.Seq brokerThrottledPartitions$1, TopicPartition tp) {
        return !brokerThrottledPartitions$1.exists((Function1 & Serializable)x$17 -> BoxesRunTime.boxToBoolean((boolean)x$17.contains((Object)tp)));
    }

    public static final /* synthetic */ void $anonfun$verifyThrottlePartitions$16(ClusterLinkDataPlaneMirroringIntegrationTest $this, ObjectRef unThrottledPartitions$1) {
        try {
            $this.sourceCluster().changeToPreferredLeader($this.topic(), (scala.collection.immutable.Set<TopicPartition>)((scala.collection.immutable.Set)unThrottledPartitions$1.elem));
            return;
        }
        catch (Throwable throwable) {
            Assertions.fail((String)"Failed in changing to preferred leader");
            return;
        }
    }

    public static final /* synthetic */ void $anonfun$verifyThrottlePartitions$17(Uuid linkId$3, ObjectRef unThrottledPartitions$1, KafkaBroker x$18) {
        ((ClusterLinkFetcherManager)x$18.clusterLinkManager().fetcherManager(linkId$3).get()).removeTestLaggingPartitions((Set)((scala.collection.immutable.Set)unThrottledPartitions$1.elem));
    }

    public static final /* synthetic */ int $anonfun$testAutoTuneFetchers$3(Tuple2 x0$1) {
        if (x0$1 != null) {
            return ((ClusterLinkFetcher)x0$1._2()).partitionCount();
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ long $anonfun$testAutoTuneFetchers$2(Uuid linkId$4, long curLinkedPartitionCount, KafkaBroker broker) {
        HashMap threadMap = (HashMap)TestUtils.fieldValue((Object)((ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId$4).get()), AbstractFetcherManager.class, (String)"fetcherThreadMap");
        return curLinkedPartitionCount + (long)BoxesRunTime.unboxToInt((Object)((IterableOnceOps)threadMap.map((Function1 & Serializable)x0$1 -> BoxesRunTime.boxToInteger((int)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$3(x0$1)))).sum((Numeric)Numeric.IntIsIntegral$.MODULE$));
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$1(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$4) {
        return (long)$this.numPartitions() == BoxesRunTime.unboxToLong((Object)$this.destCluster().brokers().foldLeft((Object)BoxesRunTime.boxToLong((long)0L), (Function2 & Serializable)(curLinkedPartitionCount, broker) -> BoxesRunTime.boxToLong((long)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$2(linkId$4, BoxesRunTime.unboxToLong((Object)curLinkedPartitionCount), broker))));
    }

    public static final /* synthetic */ String $anonfun$testAutoTuneFetchers$4() {
        return "Unexpected number of fetchers' partitions";
    }

    public static final /* synthetic */ TopicPartition $anonfun$testAutoTuneFetchers$6(ClusterLinkDataPlaneMirroringIntegrationTest $this, int i) {
        return new TopicPartition($this.topic(), i);
    }

    public static final /* synthetic */ void $anonfun$testAutoTuneFetchers$8(KafkaBroker broker$2, Uuid linkId$4, TopicPartition tp) {
        ((ClusterLinkFetcherManager)broker$2.clusterLinkManager().fetcherManager(linkId$4).get()).addTestLaggingPartition(tp, tp.partition() + 1);
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$10(Uuid linkId$4, HashMap brokerFetcherCount$1, KafkaBroker broker) {
        int fetcherCount = ((ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId$4).get()).fetcherCount();
        Assertions.assertTrue((boolean)brokerFetcherCount$1.contains((Object)BoxesRunTime.boxToInteger((int)broker.config().brokerId())), (String)new StringBuilder(20).append("broker ").append(broker.config().brokerId()).append(" is not found").toString());
        int fetcherCountBeforeAutoTune = BoxesRunTime.unboxToInt((Object)brokerFetcherCount$1.get((Object)BoxesRunTime.boxToInteger((int)broker.config().brokerId())).get());
        return fetcherCount > fetcherCountBeforeAutoTune;
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$9(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$4, HashMap brokerFetcherCount$1) {
        return $this.destCluster().brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$10(linkId$4, brokerFetcherCount$1, broker)));
    }

    public static final /* synthetic */ String $anonfun$testAutoTuneFetchers$11(HashMap brokerFetcherCount$1) {
        return new StringBuilder(70).append("fetcher count doesn't increase from ").append(brokerFetcherCount$1).append(" when there are lagging partitions").toString();
    }

    public static final /* synthetic */ void $anonfun$testAutoTuneFetchers$12(ClusterLinkDataPlaneMirroringIntegrationTest $this, KafkaBroker broker) {
        double defaultPoolTargetFetcherCount = $this.kafkaBrokerMetricValue(broker, "link-target-fetcher-count", "cluster-link-metrics", (Option<String>)new Some((Object)$this.linkName()), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pool"), (Object)"Default")}))), $this.kafkaBrokerMetricValue$default$6(), $this.kafkaBrokerMetricValue$default$7());
        Assertions.assertTrue((defaultPoolTargetFetcherCount > 1.0 ? 1 : 0) != 0, (String)new StringBuilder(74).append("broker ").append(broker.config().brokerId()).append(" has ").append(defaultPoolTargetFetcherCount).append(" default pool target, which doesn't increase after auto tuning").toString());
        double inSyncPoolTargetFetcherCount = $this.kafkaBrokerMetricValue(broker, "link-target-fetcher-count", "cluster-link-metrics", (Option<String>)new Some((Object)$this.linkName()), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pool"), (Object)"InSync")}))), $this.kafkaBrokerMetricValue$default$6(), $this.kafkaBrokerMetricValue$default$7());
        Assertions.assertEquals((double)1.0, (double)inSyncPoolTargetFetcherCount, (String)new StringBuilder(64).append("broker ").append(broker.config().brokerId()).append(" unexpected InSync target fetcher count after auto tuning").toString());
    }

    public static final /* synthetic */ void $anonfun$testAutoTuneFetchers$13(Uuid linkId$4, scala.collection.immutable.Set linkedPartitions$2, KafkaBroker x$19) {
        ((ClusterLinkFetcherManager)x$19.clusterLinkManager().fetcherManager(linkId$4).get()).removeTestLaggingPartitions((Set)linkedPartitions$2);
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$15(Uuid linkId$4, HashMap brokerFetcherCount$1, KafkaBroker broker) {
        int fetcherCount = ((ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId$4).get()).fetcherCount();
        Assertions.assertTrue((boolean)brokerFetcherCount$1.contains((Object)BoxesRunTime.boxToInteger((int)broker.config().brokerId())), (String)new StringBuilder(20).append("broker ").append(broker.config().brokerId()).append(" is not found").toString());
        return BoxesRunTime.unboxToInt((Object)brokerFetcherCount$1.get((Object)BoxesRunTime.boxToInteger((int)broker.config().brokerId())).get()) == fetcherCount;
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$14(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$4, HashMap brokerFetcherCount$1) {
        return $this.destCluster().brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$15(linkId$4, brokerFetcherCount$1, broker)));
    }

    public static final /* synthetic */ String $anonfun$testAutoTuneFetchers$16(HashMap brokerFetcherCount$1) {
        return new StringBuilder(67).append("fetcher count doesn't return to ").append(brokerFetcherCount$1).append(" when there is no lagging partition").toString();
    }

    public static final /* synthetic */ void $anonfun$testAutoTuneFetchers$17(ClusterLinkDataPlaneMirroringIntegrationTest $this, KafkaBroker broker) {
        double defaultPoolTargetFetcherCount = $this.kafkaBrokerMetricValue(broker, "link-target-fetcher-count", "cluster-link-metrics", (Option<String>)new Some((Object)$this.linkName()), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pool"), (Object)"Default")}))), $this.kafkaBrokerMetricValue$default$6(), $this.kafkaBrokerMetricValue$default$7());
        Assertions.assertEquals((double)1.0, (double)defaultPoolTargetFetcherCount, (String)new StringBuilder(78).append("broker ").append(broker.config().brokerId()).append(" has unexpected default pool target fetchers when no lagging partitions").toString());
        double inSyncPoolTargetFetcherCount = $this.kafkaBrokerMetricValue(broker, "link-target-fetcher-count", "cluster-link-metrics", (Option<String>)new Some((Object)$this.linkName()), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pool"), (Object)"InSync")}))), $this.kafkaBrokerMetricValue$default$6(), $this.kafkaBrokerMetricValue$default$7());
        Assertions.assertEquals((double)1.0, (double)inSyncPoolTargetFetcherCount, (String)new StringBuilder(72).append("broker ").append(broker.config().brokerId()).append(" unexpected InSync target fetcher count when no lagging partition").toString());
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$19(Uuid linkId$4, HashMap brokerFetcherCount$1, KafkaBroker broker) {
        int fetcherCount = ((ClusterLinkFetcherManager)broker.clusterLinkManager().fetcherManager(linkId$4).get()).fetcherCount();
        Assertions.assertTrue((boolean)brokerFetcherCount$1.contains((Object)BoxesRunTime.boxToInteger((int)broker.config().brokerId())), (String)new StringBuilder(20).append("broker ").append(broker.config().brokerId()).append(" is not found").toString());
        int fetcherCountBeforeAutoTune = BoxesRunTime.unboxToInt((Object)brokerFetcherCount$1.get((Object)BoxesRunTime.boxToInteger((int)broker.config().brokerId())).get());
        return fetcherCount > fetcherCountBeforeAutoTune;
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$18(ClusterLinkDataPlaneMirroringIntegrationTest $this, Uuid linkId$4, HashMap brokerFetcherCount$1) {
        return $this.destCluster().brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$19(linkId$4, brokerFetcherCount$1, broker)));
    }

    public static final /* synthetic */ String $anonfun$testAutoTuneFetchers$20(HashMap brokerFetcherCount$1) {
        return new StringBuilder(55).append("fetcher count does not increase from ").append(brokerFetcherCount$1).append(" after reconfigure").toString();
    }

    public static final /* synthetic */ void $anonfun$testAutoTuneFetchers$21(ClusterLinkDataPlaneMirroringIntegrationTest $this, KafkaBroker broker) {
        double defaultPoolTargetFetcherCount = $this.kafkaBrokerMetricValue(broker, "link-target-fetcher-count", "cluster-link-metrics", (Option<String>)new Some((Object)$this.linkName()), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pool"), (Object)"Default")}))), $this.kafkaBrokerMetricValue$default$6(), $this.kafkaBrokerMetricValue$default$7());
        Assertions.assertEquals((double)5.0, (double)defaultPoolTargetFetcherCount, (String)new StringBuilder(85).append("broker ").append(broker.config().brokerId()).append(" has unexpected default pool target fetcher count when auto tuning is disabled").toString());
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$23(ClusterLinkDataPlaneMirroringIntegrationTest $this, KafkaBroker broker) {
        return $this.kafkaBrokerMetricValue(broker, "link-target-fetcher-count", "cluster-link-metrics", (Option<String>)new Some((Object)$this.linkName()), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pool"), (Object)"InSync")}))), $this.kafkaBrokerMetricValue$default$6(), $this.kafkaBrokerMetricValue$default$7()) == (double)5;
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$22(ClusterLinkDataPlaneMirroringIntegrationTest $this) {
        return $this.destCluster().brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$23($this, broker)));
    }

    public static final /* synthetic */ String $anonfun$testAutoTuneFetchers$24() {
        return "InSync target fetcher count doesn't get reset to 5 when auto tuning is disabled";
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$26(ClusterLinkDataPlaneMirroringIntegrationTest $this, KafkaBroker broker) {
        return $this.kafkaBrokerMetricValue(broker, "link-target-fetcher-count", "cluster-link-metrics", (Option<String>)new Some((Object)$this.linkName()), (Map<String, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"pool"), (Object)"InSync")}))), $this.kafkaBrokerMetricValue$default$6(), $this.kafkaBrokerMetricValue$default$7()) == 1.0;
    }

    public static final /* synthetic */ boolean $anonfun$testAutoTuneFetchers$25(ClusterLinkDataPlaneMirroringIntegrationTest $this) {
        return $this.destCluster().brokers().forall((Function1 & Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testAutoTuneFetchers$26($this, broker)));
    }

    public static final /* synthetic */ String $anonfun$testAutoTuneFetchers$27() {
        return "InSync pool target fetcher count does not reset to 1 after enabling auto tuning";
    }

    public static final /* synthetic */ TopicPartition $anonfun$logOffsets$1(String topic$2, int i) {
        return new TopicPartition(topic$2, i);
    }

    public static final /* synthetic */ long $anonfun$logOffsets$2(ClusterLinkTestHarness cluster$1, Function1 logOffset$1, TopicPartition tp) {
        return BoxesRunTime.unboxToLong((Object)cluster$1.partitionLeader(tp).replicaManager().onlinePartition(tp).flatMap((Function1 & Serializable)x$20 -> x$20.leaderLogIfLocal()).map(logOffset$1).get());
    }

    public static final /* synthetic */ void $anonfun$shutdownClearMirrorStartOffsets$1(Uuid linkId$5, KafkaBroker broker) {
        ((ClusterLinkDestClientManager)broker.clusterLinkManager().clientManager(linkId$5).get()).taskManager().clusterLinkClearStartOffsetsForMirrors().shutdown();
    }

    public static final /* synthetic */ Object $anonfun$testFetcherThreads$16$adapted(Product newMode$1, Uuid linkId$1, KafkaBroker broker) {
        ClusterLinkDataPlaneMirroringIntegrationTest.$anonfun$testFetcherThreads$16(newMode$1, linkId$1, broker);
        return BoxedUnit.UNIT;
    }
}

