/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.io.Serializable;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkConvertToMirrorTopicTaskType$;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$;
import kafka.server.link.InternalTaskErrorCode$;
import kafka.server.link.NoErrorCode$;
import kafka.server.link.TaskErrorCode;
import kafka.server.link.TaskType;
import kafka.server.link.TopicType$;
import kafka.utils.Implicits;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.ClusterLinkTaskError;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.;
import scala.$less$colon$less$;
import scala.Enumeration;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IndexedSeqOps;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.SeqOps;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

@Tags(value={@Tag(value="integration"), @Tag(value="bazel:shard_count:10")})
@ScalaSignature(bytes="\u0006\u0005\r%b\u0001B\u0011#\u0001\u001dBQ\u0001\f\u0001\u0005\u00025BQa\f\u0001\u0005BABQa\u000e\u0001\u0005\u0002aBQ\u0001\u001b\u0001\u0005\u0002%DQ\u0001\u001d\u0001\u0005\u0002EDQ\u0001\u001f\u0001\u0005\u0002eDq!!\u0001\u0001\t\u0003\t\u0019\u0001C\u0004\u0002\u0012\u0001!\t!a\u0005\t\u000f\u0005\u0005\u0002\u0001\"\u0001\u0002$!9\u0011\u0011\u0007\u0001\u0005\u0002\u0005M\u0002bBA!\u0001\u0011\u0005\u00111\t\u0005\b\u0003#\u0002A\u0011AA*\u0011\u001d\t\t\u0007\u0001C\u0001\u0003GBq!!\u001d\u0001\t\u0003\t\u0019\bC\u0004\u0002\u0002\u0002!\t!a!\t\u000f\u0005E\u0005\u0001\"\u0001\u0002\u0014\"9\u0011\u0011\u0015\u0001\u0005\u0002\u0005\r\u0006bBAY\u0001\u0011\u0005\u00111\u0017\u0005\b\u0003\u0003\u0004A\u0011AAb\u0011\u001d\t\t\u000e\u0001C\u0005\u0003'Dq!!<\u0001\t\u0003\ty\u000fC\u0004\u0003\u0002\u0001!IAa\u0001\t\u0013\t\u0005\u0004!%A\u0005\n\t\r\u0004\"\u0003B=\u0001E\u0005I\u0011\u0002B>\u0011\u001d\u0011y\b\u0001C\u0005\u0005\u0003CqA!$\u0001\t\u0003\u0011y\tC\u0005\u00034\u0002\t\n\u0011\"\u0001\u00036\"I!\u0011\u0018\u0001\u0012\u0002\u0013\u0005!1\r\u0005\n\u0005w\u0003\u0011\u0013!C\u0001\u0005wBqA!0\u0001\t\u0013\u0011y\fC\u0004\u0003\\\u0002!IA!8\t\u000f\r\u0005\u0001\u0001\"\u0005\u0004\u0004\t\u0001#)\u001b3je\u0016\u001cG/[8oC2d\u0015N\\6J]R,wM]1uS>tG+Z:u\u0015\t\u0019C%\u0001\u0003mS:\\'\"A\u0013\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u000b\t\u0003S)j\u0011AI\u0005\u0003W\t\u0012!%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001/!\tI\u0003!A\rnCf\u0014W-V:f\u0005&$\u0017N]3di&|g.\u00197MS:\\G#A\u0019\u0011\u0005I*T\"A\u001a\u000b\u0003Q\nQa]2bY\u0006L!AN\u001a\u0003\tUs\u0017\u000e^\u0001-i\u0016\u001cHOQ5eSJ,7\r^5p]\u0006dG*\u001b8l/&$\bnT;uE>,h\u000eZ\"p]:,7\r^5p]N$B!M\u001dG\u0017\")!h\u0001a\u0001w\u00051\u0011/^8sk6\u0004\"\u0001P\"\u000f\u0005u\n\u0005C\u0001 4\u001b\u0005y$B\u0001!'\u0003\u0019a$o\\8u}%\u0011!iM\u0001\u0007!J,G-\u001a4\n\u0005\u0011+%AB*ue&twM\u0003\u0002Cg!)qi\u0001a\u0001\u0011\u0006Y1m\\8sI&t\u0017\r^8s!\t\u0011\u0014*\u0003\u0002Kg\t9!i\\8mK\u0006t\u0007\"\u0002'\u0004\u0001\u0004Y\u0014\u0001\u00057pG\u0006d'+\u001a9mS\u000e\fG/[8oQ\u0011\u0019aJW.\u0011\u0005=CV\"\u0001)\u000b\u0005E\u0013\u0016A\u00029be\u0006l7O\u0003\u0002T)\u00069!.\u001e9ji\u0016\u0014(BA+W\u0003\u0015QWO\\5u\u0015\u00059\u0016aA8sO&\u0011\u0011\f\u0015\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\u0018\u0001\u00028b[\u0016\f\u0013\u0001X\u0001>w\u0012L7\u000f\u001d7bs:\u000bW.Z?/cV|'/^7>wBjhfY8pe\u0012Lg.\u0019;pevZ\u0018' \u0018m_\u000e\fGNU3qY&\u001c\u0017\r^5p]vZ(' \u0015\u0005\u0007y#W\r\u0005\u0002`E6\t\u0001M\u0003\u0002b!\u0006A\u0001O]8wS\u0012,'/\u0003\u0002dA\naQ*\u001a;i_\u0012\u001cv.\u001e:dK\u0006)a/\u00197vK2\na-I\u0001h\u0003!\nXo\u001c:v[\u000e{wN\u001d3j]\u0006$xN\u001d*fa2L7-\u0019;j_:\u001cu.\u001c2j]\u0006$\u0018n\u001c8t\u0003=\"Xm\u001d;CS\u0012L'/Z2uS>t\u0017\r\u001c'j].<\u0016\u000e\u001e5P]\u0016\u001cuN\u001c8fGRLwN\\%oSRL\u0017\r^8s)\u0011\t$n\u001b7\t\u000bi\"\u0001\u0019A\u001e\t\u000b\u001d#\u0001\u0019\u0001%\t\u000b1#\u0001\u0019A\u001e)\t\u0011q%l\u0017\u0015\u0005\ty#w\u000eL\u0001g\u0003\u0019\"Xm\u001d;CS\u0012L'/Z2uS>t\u0017\r\u001c'j].<\u0016\u000e\u001e5BkR|W*\u001b:s_JLgn\u001a\u000b\u0005cI\u001cH\u000fC\u0003;\u000b\u0001\u00071\bC\u0003H\u000b\u0001\u0007\u0001\nC\u0003M\u000b\u0001\u00071\b\u000b\u0003\u0006\u001dj[\u0006\u0006B\u0003_I^d\u0013AZ\u00013i\u0016\u001cHOQ5eSJ,7\r^5p]\u0006dG*\u001b8l/&$\bn\\;u\u0013:\u001cG.\u001e3j]\u001e\u0014V-\\8uK6K'O]8sgR!\u0011G_>}\u0011\u0015Qd\u00011\u0001<\u0011\u00159e\u00011\u0001I\u0011\u0015ae\u00011\u0001<Q\u00111aJW.)\t\u0019qFm \u0017\u0002M\u0006IB/Z:u%\u00164XM]:f\u0003:$7\u000b^1si6K'O]8s)\u001d\t\u0014QAA\u0004\u0003\u0013AQAO\u0004A\u0002mBQaR\u0004A\u0002!CQ\u0001T\u0004A\u0002mBCa\u0002([7\"*qA\u00183\u0002\u00101\na-A\u0018uKN$(+\u001a<feN,\u0017I\u001c3Ti\u0006\u0014H/T5se>\u0014x+\u001b;i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00115b]\u001e,7\u000fF\u00042\u0003+\t9\"!\u0007\t\u000biB\u0001\u0019A\u001e\t\u000b\u001dC\u0001\u0019\u0001%\t\u000b1C\u0001\u0019A\u001e)\t!q%l\u0017\u0015\u0006\u0011y#\u0017q\u0004\u0017\u0002M\u0006\u0001C/Z:u%\u00164XM]:f\u0003:$7\u000b^1si6K'O]8s/&$\b\u000eT1h)\u001d\t\u0014QEA\u0014\u0003SAQAO\u0005A\u0002mBQaR\u0005A\u0002!CQ\u0001T\u0005A\u0002mBC!\u0003([7\"*\u0011B\u00183\u000201\na-\u0001\u001buKN$(+\u001a<feN,\u0017I\u001c3Ti\u0006\u0014H/T5se>\u0014\u0018N\\4XSRD'+Z7pi\u0016\u001cE.^:uKJ\u0014Vm\u001d;beR$r!MA\u001b\u0003o\tI\u0004C\u0003;\u0015\u0001\u00071\bC\u0003H\u0015\u0001\u0007\u0001\nC\u0003M\u0015\u0001\u00071\b\u000b\u0003\u000b\u001dj[\u0006&\u0002\u0006_I\u0006}B&\u00014\u0002gQ,7\u000f\u001e*fm\u0016\u00148/Z!oIN#\u0018M\u001d;NSJ\u0014xN]5oO^KG\u000f\u001b'pG\u0006d7\t\\;ti\u0016\u0014(+Z:uCJ$HcB\u0019\u0002F\u0005\u001d\u0013\u0011\n\u0005\u0006u-\u0001\ra\u000f\u0005\u0006\u000f.\u0001\r\u0001\u0013\u0005\u0006\u0019.\u0001\ra\u000f\u0015\u0005\u00179S6\fK\u0003\f=\u0012\fy\u0005L\u0001g\u0003Q\"Xm\u001d;SKZ,'o]3B]\u0012\u001cF/\u0019:u\u001b&\u0014(o\u001c:j]\u001e<\u0016\u000e\u001e5QCV\u001cX-\u00118e+:\u0004\u0018-^:f\u0019&t7n\u001d\u000b\bc\u0005U\u0013qKA-\u0011\u0015QD\u00021\u0001<\u0011\u00159E\u00021\u0001I\u0011\u0015aE\u00021\u0001<Q\u0011aaJW.)\u000b1qF-a\u0018-\u0003\u0019\fq\b^3tiJ+g/\u001a:tK\u0006sGm\u0015;beRl\u0015N\u001d:pe&twmV5uQB\u000bWo]3B]\u0012,f\u000e]1vg\u0016dunY1m\u001b&\u0014(o\u001c:U_BL7\rF\u00042\u0003K\n9'!\u001b\t\u000bij\u0001\u0019A\u001e\t\u000b\u001dk\u0001\u0019\u0001%\t\u000b1k\u0001\u0019A\u001e)\t5q%l\u0017\u0015\u0006\u001by#\u0017q\u000e\u0017\u0002M\u00061F/Z:u%\u00164XM]:f\u0003:$7\u000b^1si6K'O]8sS:<w+\u001b;i!\u0006,8/Z!oIVs\u0007/Y;tK2{7-\u00197NSJ\u0014xN\u001d+pa&\u001cw+\u001b;i\u0019>\u001c\u0017\r\\\"mkN$XM\u001d*fgR\f'\u000f\u001e\u000b\bc\u0005U\u0014qOA=\u0011\u0015Qd\u00021\u0001<\u0011\u00159e\u00021\u0001I\u0011\u0015ae\u00021\u0001<Q\u0011qaJW.)\u000b9qF-a -\u0003\u0019\f!\u0006^3tiJ+g/\u001a:tK\u0006sGm\u0015;beRl\u0015N\u001d:pe^KG\u000f\u001b#fY\u0016$X\r\u001a+pa&\u001c7\u000fF\u00042\u0003\u000b\u000b9)!#\t\u000biz\u0001\u0019A\u001e\t\u000b\u001d{\u0001\u0019\u0001%\t\u000b1{\u0001\u0019A\u001e)\t=q%l\u0017\u0015\u0006\u001fy#\u0017q\u0012\u0017\u0002M\u0006iC/Z:u%>dGNY1dW6K'O]8s\rJ|W\u000eU3oI&twmU=oG\"\u0014xN\\5{KN#\u0018\r^3\u0015\u000fE\n)*a&\u0002\u001a\")!\b\u0005a\u0001w!)q\t\u0005a\u0001\u0011\")A\n\u0005a\u0001w!\"\u0001C\u0014.\\Q\u0015\u0001b\fZAPY\u00051\u0017A\n;fgR\u0014v\u000e\u001c7cC\u000e\\W*\u001b:s_J4\u0015-\u001b7t\u001f:\u0004VM\u001c3j]\u001el\u0015N\u001d:peR9\u0011'!*\u0002(\u0006%\u0006\"\u0002\u001e\u0012\u0001\u0004Y\u0004\"B$\u0012\u0001\u0004A\u0005\"\u0002'\u0012\u0001\u0004Y\u0004\u0006B\tO5nCS!\u00050e\u0003_c\u0013AZ\u0001$i\u0016\u001cH\u000fU1vg\u0016l\u0015N\u001d:pe\u001a\u000b\u0017\u000e\\:P]B+g\u000eZ5oO6K'O]8s)\u001d\t\u0014QWA\\\u0003sCQA\u000f\nA\u0002mBQa\u0012\nA\u0002!CQ\u0001\u0014\nA\u0002mBCA\u0005([7\"*!C\u00183\u0002@2\na-A\ruKN$(+\u001a<feN,\u0017I\u001c3QCV\u001cX-T5se>\u0014HcB\u0019\u0002F\u0006\u001d\u0017\u0011\u001a\u0005\u0006uM\u0001\ra\u000f\u0005\u0006\u000fN\u0001\r\u0001\u0013\u0005\u0006\u0019N\u0001\ra\u000f\u0015\u0005'9S6\fK\u0003\u0014=\u0012\fy\rL\u0001g\u0003]\u0001(o\u001c3vG\u0016\u0014VmY8sIN$vn\u00117vgR,'\u000fF\u00042\u0003+\fy.a9\t\u000f\u0005]G\u00031\u0001\u0002Z\u000691\r\\;ti\u0016\u0014\bcA\u0015\u0002\\&\u0019\u0011Q\u001c\u0012\u0003-\rcWo\u001d;fe2Kgn\u001b+fgRD\u0015M\u001d8fgNDa!!9\u0015\u0001\u0004Y\u0014!\u0002;pa&\u001c\u0007bBAs)\u0001\u0007\u0011q]\u0001\u000b]Vl'+Z2pe\u0012\u001c\bc\u0001\u001a\u0002j&\u0019\u00111^\u001a\u0003\u0007%sG/\u0001\u0012uKN$X*\u001e7uSBdW-T5se>\u00148\u000b^1uKR\u0013\u0018M\\:ji&|gn\u001d\u000b\bc\u0005E\u00181_A{\u0011\u0015QT\u00031\u0001<\u0011\u00159U\u00031\u0001I\u0011\u0015aU\u00031\u0001<Q\u0015)bJWA}C\t\tY0\u0001\u0015|I&\u001c\b\u000f\\1z\u001d\u0006lW- \u0018rk>\u0014X/\\\u001f|aut3m\\8sI&t\u0017\r^8s{m\fT\u0010K\u0003\u0016=\u0012\fy\u0010L\u0001g\u0003]1XM]5gs\nKG-\u001b:fGRLwN\\1m\u0019&t7\u000eF\u00052\u0005\u000b\u00119Ba\u0007\u0003R!9!q\u0001\fA\u0002\t%\u0011AF3bgRd\u0015N\\6D_:tWm\u0019;j_:lu\u000eZ3\u0011\t\t-!1C\u0007\u0003\u0005\u001bQ1a\tB\b\u0015\r\u0011\t\u0002J\u0001\u0007g\u0016\u0014h/\u001a:\n\t\tU!Q\u0002\u0002\u000f\u0007>tg.Z2uS>tWj\u001c3f\u0011\u001d\u0011IB\u0006a\u0001\u0005\u0013\tac^3ti2Kgn[\"p]:,7\r^5p]6{G-\u001a\u0005\n\u0005;1\u0002\u0013!a\u0001\u0005?\t!\u0002^8qS\u000e$\u0016\u0010]3t!\u0015\u0011$\u0011\u0005B\u0013\u0013\r\u0011\u0019c\r\u0002\u0007\u001fB$\u0018n\u001c8\u0011\r\t\u001d\"\u0011\u0007B\u001c\u001d\u0011\u0011IC!\f\u000f\u0007y\u0012Y#C\u00015\u0013\r\u0011ycM\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\u0011\u0019D!\u000e\u0003\u0007M+\u0017OC\u0002\u00030M\u0002BA!\u000f\u0003L9!!1\bB$\u001d\u0011\u0011iD!\u0012\u000f\t\t}\"1\t\b\u0004}\t\u0005\u0013\"A\u0013\n\u0007\tEA%C\u0002$\u0005\u001fIAA!\u0013\u0003\u000e\u0005IAk\u001c9jGRK\b/Z\u0005\u0005\u0005\u001b\u0012yEA\u0005U_BL7\rV=qK*!!\u0011\nB\u0007\u0011%\u0011\u0019F\u0006I\u0001\u0002\u0004\u0011)&A\bd_:4\u0017nZ(wKJ\u0014\u0018\u000eZ3t!\u0019\u00119F!\u0018<w5\u0011!\u0011\f\u0006\u0004\u00057\u001a\u0014AC2pY2,7\r^5p]&!!q\fB-\u0005\ri\u0015\r]\u0001\"m\u0016\u0014\u0018NZ=CS\u0012L'/Z2uS>t\u0017\r\u001c'j].$C-\u001a4bk2$HeM\u000b\u0003\u0005KRCAa\b\u0003h-\u0012!\u0011\u000e\t\u0005\u0005W\u0012)(\u0004\u0002\u0003n)!!q\u000eB9\u0003%)hn\u00195fG.,GMC\u0002\u0003tM\n!\"\u00198o_R\fG/[8o\u0013\u0011\u00119H!\u001c\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-A\u0011wKJLg-\u001f\"jI&\u0014Xm\u0019;j_:\fG\u000eT5oW\u0012\"WMZ1vYR$C'\u0006\u0002\u0003~)\"!Q\u000bB4\u0003\t\u001awN\u001c4jOV\u0014X-T5se>\u0014HK]1og&$\u0018n\u001c8CCR\u001c\u0007nU5{KR)\u0011Ga!\u0003\n\"9!QQ\rA\u0002\t\u001d\u0015\u0001C2mkN$XM]:\u0011\r\t\u001d\"\u0011GAm\u0011\u001d\u0011Y)\u0007a\u0001\u0003O\f\u0011BY1uG\"\u001c\u0016N_3\u0002\u00131Lgn\u001b)s_B\u001cH\u0003\u0004BI\u0005C\u0013)K!+\u00030\nE\u0006\u0003\u0002BJ\u0005;k!A!&\u000b\t\t]%\u0011T\u0001\u0005kRLGN\u0003\u0002\u0003\u001c\u0006!!.\u0019<b\u0013\u0011\u0011yJ!&\u0003\u0015A\u0013x\u000e]3si&,7\u000fC\u0004\u0003$j\u0001\rA!\u0003\u0002\u001d\r|gN\\3di&|g.T8eK\"9!q\u0015\u000eA\u0002\u0005e\u0017!\u0004:f[>$Xm\u00117vgR,'\u000fC\u0005\u0003,j\u0001\n\u00111\u0001\u0003.\u0006i1m\u001c8tk6,'o\u0012:pkB\u0004BA\rB\u0011w!I!Q\u0004\u000e\u0011\u0002\u0003\u0007!q\u0004\u0005\n\u0005'R\u0002\u0013!a\u0001\u0005+\n1\u0003\\5oWB\u0013x\u000e]:%I\u00164\u0017-\u001e7uIM*\"Aa.+\t\t5&qM\u0001\u0014Y&t7\u000e\u0015:paN$C-\u001a4bk2$H\u0005N\u0001\u0014Y&t7\u000e\u0015:paN$C-\u001a4bk2$H%N\u0001\rm\u0016\u0014\u0018NZ=NSJ\u0014xN\u001d\u000b\u0006c\t\u0005'1\u0019\u0005\b\u0003/t\u0002\u0019AAm\u0011\u001d\u0011)M\ba\u0001\u0005\u000f\f!\u0002]1si&$\u0018n\u001c8t!\u0019\u00119C!\r\u0003JB!!1\u001aBl\u001b\t\u0011iM\u0003\u0003\u0003P\nE\u0017AB2p[6|gNC\u0002&\u0005'T1A!6W\u0003\u0019\t\u0007/Y2iK&!!\u0011\u001cBg\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:\f\u0011C^3sS\u001aLH*\u001b8l\u001b\u0016$(/[2t)=\t$q\u001cBu\u0005[\u0014\tP!>\u0003z\nu\bb\u0002Bq?\u0001\u0007!1]\u0001\u0007Y&t7.\u00133\u0011\t\t-'Q]\u0005\u0005\u0005O\u0014iM\u0001\u0003Vk&$\u0007b\u0002Bv?\u0001\u0007\u0011\u0011\\\u0001\fK\u0006\u001cHo\u00117vgR,'\u000fC\u0004\u0003p~\u0001\r!!7\u0002\u0017],7\u000f^\"mkN$XM\u001d\u0005\b\u0005g|\u0002\u0019\u0001BI\u00035)\u0017m\u001d;MS:\\\u0007K]8qg\"9!q_\u0010A\u0002\tE\u0015!D<fgRd\u0015N\\6Qe>\u00048\u000f\u0003\u0004\u0003|~\u0001\raO\u0001\nK\u0006\u001cH\u000fV8qS\u000eDaAa@ \u0001\u0004Y\u0014!C<fgR$v\u000e]5d\u0003]\u0019'/Z1uK\nKG-\u001b:fGRLwN\\1m\u0019&t7\u000e\u0006\u0007\u0003d\u000e\u00151\u0011BB\u0006\u0007\u001b\u0019y\u0001\u0003\u0004\u0004\b\u0001\u0002\raO\u0001\tY&t7NT1nK\"9!1\u001e\u0011A\u0002\u0005e\u0007b\u0002BxA\u0001\u0007\u0011\u0011\u001c\u0005\b\u0005g\u0004\u0003\u0019\u0001BI\u0011\u001d\u00119\u0010\ta\u0001\u0005#Cc\u0001AB\nI\u000e}\u0001\u0003BB\u000b\u00077i!aa\u0006\u000b\u0007\re!+A\u0002ba&LAa!\b\u0004\u0018\t\u0019A+Y4\"\u0005\r\u0005\u0012aC5oi\u0016<'/\u0019;j_:Dc\u0001AB\nI\u000e\u0015\u0012EAB\u0014\u0003Q\u0011\u0017M_3muMD\u0017M\u001d3`G>,h\u000e\u001e\u001e2a\u0001")
public class BidirectionalLinkIntegrationTest
extends AbstractClusterLinkIntegrationTest {
    @Override
    public void maybeUseBidirectionalLink() {
        this.useBidirectionalLink_$eq(true);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testBidirectionalLinkWithOutboundConnections(String quorum, boolean coordinator, String localReplication) {
        this.verifyBidirectionalLink((ConnectionMode)ConnectionMode.Outbound$.MODULE$, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (Option<scala.collection.immutable.Seq<Enumeration.Value>>)None$.MODULE$, (Map<String, String>)((Map)Map$.MODULE$.empty()));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testBidirectionalLinkWithOneConnectionInitiator(String quorum, boolean coordinator, String localReplication) {
        this.useSourceInitiatedLink_$eq(true);
        this.verifyBidirectionalLink((ConnectionMode)ConnectionMode.Inbound$.MODULE$, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (Option<scala.collection.immutable.Seq<Enumeration.Value>>)None$.MODULE$, (Map<String, String>)((Map)Map$.MODULE$.empty()));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testBidirectionalLinkWithAutoMirroring(String quorum, boolean coordinator, String localReplication) {
        Map configOverrides = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), (Object)"true"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.TopicFiltersProp()), (Object)this.includeAllTopicsFilter()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.max.age.ms"), (Object)"500")}));
        this.verifyBidirectionalLink((ConnectionMode)ConnectionMode.Outbound$.MODULE$, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (Option<scala.collection.immutable.Seq<Enumeration.Value>>)None$.MODULE$, (Map<String, String>)configOverrides);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testBidirectionalLinkWithoutIncludingRemoteMirrors(String quorum, boolean coordinator, String localReplication) {
        Some topicTypes = new Some((Object)new .colon.colon((Object)TopicType$.MODULE$.LOCAL_MIRROR(), (List)Nil$.MODULE$));
        this.verifyBidirectionalLink((ConnectionMode)ConnectionMode.Outbound$.MODULE$, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (Option<scala.collection.immutable.Seq<Enumeration.Value>>)topicTypes, (Map<String, String>)((Map)Map$.MODULE$.empty()));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndStartMirror(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        String topic = "topic";
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirror$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(topic, this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, topic, numRecords);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(topic, this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        this.assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", (Seq<TaskErrorCode>)new .colon.colon((Object)NoErrorCode$.MODULE$, (List)Nil$.MODULE$), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)localCluster.linkCoordinator(this.linkName()), (List)Nil$.MODULE$), localCluster.nonLinkCoordinators(this.linkName()), this.assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        this.assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", (Seq<TaskErrorCode>)new .colon.colon((Object)NoErrorCode$.MODULE$, (List)Nil$.MODULE$), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)remoteCluster.linkCoordinator(this.linkName()), (List)Nil$.MODULE$), remoteCluster.nonLinkCoordinators(this.linkName()), this.assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        this.produceRecordsToCluster(localCluster, topic, numRecords);
        this.waitForMirroring(remoteCluster, (Seq<TopicPartition>)partitions);
        remoteCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        this.assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", (Seq<TaskErrorCode>)new .colon.colon((Object)NoErrorCode$.MODULE$, (List)Nil$.MODULE$), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)remoteCluster.linkCoordinator(this.linkName()), (List)Nil$.MODULE$), remoteCluster.nonLinkCoordinators(this.linkName()), this.assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        this.assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", (Seq<TaskErrorCode>)new .colon.colon((Object)NoErrorCode$.MODULE$, (List)Nil$.MODULE$), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)localCluster.linkCoordinator(this.linkName()), (List)Nil$.MODULE$), localCluster.nonLinkCoordinators(this.linkName()), this.assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndStartMirrorWithLeaderEpochChanges(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        String topic = "topic";
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(topic, this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(topic, this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        IntRef atLeastExpectedLeaderEpoch = IntRef.create((int)3);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), atLeastExpectedLeaderEpoch.elem).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)x$1 -> partitions.foreach((Function1 & Serializable)tp -> BoxesRunTime.boxToInteger((int)localCluster.changeLeader(tp))));
        partitions.foreach((Function1 & Serializable)p -> {
            BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(localCluster, atLeastExpectedLeaderEpoch, p);
            return BoxedUnit.UNIT;
        });
        localCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        partitions.foreach((Function1 & Serializable)p -> {
            BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(localCluster, atLeastExpectedLeaderEpoch, p);
            return BoxedUnit.UNIT;
        });
        this.produceRecordsToCluster(localCluster, topic, numRecords);
        this.waitForMirroring(remoteCluster, (Seq<TopicPartition>)partitions);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), atLeastExpectedLeaderEpoch.elem).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)x$1 -> partitions.foreach((Function1 & Serializable)tp -> BoxesRunTime.boxToInteger((int)localCluster.changeLeader(tp))));
        atLeastExpectedLeaderEpoch.elem += atLeastExpectedLeaderEpoch.elem;
        partitions.foreach((Function1 & Serializable)p -> {
            BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(localCluster, atLeastExpectedLeaderEpoch, p);
            return BoxedUnit.UNIT;
        });
        remoteCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        partitions.foreach((Function1 & Serializable)p -> {
            BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(localCluster, atLeastExpectedLeaderEpoch, p);
            return BoxedUnit.UNIT;
        });
        this.produceRecordsToCluster(remoteCluster, topic, numRecords);
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndStartMirrorWithLag(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 10000;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        String topic = "topic";
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirrorWithLag$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(topic, this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(topic, this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        localCluster.pauseTopic(topic, localCluster.pauseTopic$default$2());
        this.produceRecordsToCluster(remoteCluster, topic, numRecords);
        localCluster.pauseTopic(topic, false);
        localCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        this.produceRecordsToCluster(localCluster, topic, numRecords);
        this.waitForMirroring(remoteCluster, (Seq<TopicPartition>)partitions);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndStartMirroringWithRemoteClusterRestart(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        String topic = "topic";
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirroringWithRemoteClusterRestart$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(topic, this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, topic, numRecords);
        this.configureMirrorTransitionBatchSize((scala.collection.immutable.Seq<ClusterLinkTestHarness>)new .colon.colon((Object)localCluster, (List)new .colon.colon((Object)remoteCluster, (List)Nil$.MODULE$)), 2);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(topic, this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        remoteCluster.killAllBrokers();
        remoteCluster.restartDeadBrokers(remoteCluster.restartDeadBrokers$default$1());
        remoteCluster.updateBootstrapServers();
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        MirrorTopicDescription descWithTransitionErrors = localCluster.describeMirrorTopic(topic, true);
        Assertions.assertNotNull((Object)descWithTransitionErrors);
        java.util.List transitionErrors = descWithTransitionErrors.mirrorStateTransitionErrors();
        Assertions.assertEquals((int)0, (int)transitionErrors.size());
        this.produceRecordsToCluster(localCluster, topic, numRecords);
        this.waitForMirroring(remoteCluster, (Seq<TopicPartition>)partitions);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndStartMirroringWithLocalClusterRestart(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        String topic = "topic";
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirroringWithLocalClusterRestart$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(topic, this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, topic, numRecords);
        this.configureMirrorTransitionBatchSize((scala.collection.immutable.Seq<ClusterLinkTestHarness>)new .colon.colon((Object)localCluster, (List)new .colon.colon((Object)remoteCluster, (List)Nil$.MODULE$)), 2);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(topic, this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilPendingMirrorState(topic, this.linkName(), remoteCluster.waitUntilPendingMirrorState$default$3());
        localCluster.killAllBrokers();
        localCluster.restartDeadBrokers(localCluster.restartDeadBrokers$default$1());
        localCluster.updateBootstrapServers();
        remoteCluster.alterClusterLink(this.linkName(), (Map<String, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)localCluster.bootstrapServers(localCluster.bootstrapServers$default$1()))}))), remoteCluster.alterClusterLink$default$3(), remoteCluster.alterClusterLink$default$4(), remoteCluster.alterClusterLink$default$5());
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        this.produceRecordsToCluster(localCluster, topic, numRecords);
        this.waitForMirroring(remoteCluster, (Seq<TopicPartition>)partitions);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndStartMirroringWithPauseAndUnpauseLinks(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        String topic = "topic";
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLinks$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(topic, this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, topic, numRecords);
        this.configureMirrorTransitionBatchSize((scala.collection.immutable.Seq<ClusterLinkTestHarness>)new .colon.colon((Object)localCluster, (List)new .colon.colon((Object)remoteCluster, (List)Nil$.MODULE$)), Integer.MAX_VALUE);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(topic, this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilPendingMirrorState(topic, this.linkName(), remoteCluster.waitUntilPendingMirrorState$default$3());
        localCluster.alterClusterLink(this.linkName(), (Map<String, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), (Object)"true")}))), localCluster.alterClusterLink$default$3(), localCluster.alterClusterLink$default$4(), localCluster.alterClusterLink$default$5());
        remoteCluster.alterClusterLink(this.linkName(), (Map<String, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), (Object)"true")}))), remoteCluster.alterClusterLink$default$3(), remoteCluster.alterClusterLink$default$4(), remoteCluster.alterClusterLink$default$5());
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        localCluster.alterClusterLink(this.linkName(), (Map<String, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), (Object)"false")}))), localCluster.alterClusterLink$default$3(), localCluster.alterClusterLink$default$4(), localCluster.alterClusterLink$default$5());
        remoteCluster.alterClusterLink(this.linkName(), (Map<String, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), (Object)"false")}))), remoteCluster.alterClusterLink$default$3(), remoteCluster.alterClusterLink$default$4(), remoteCluster.alterClusterLink$default$5());
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        this.produceRecordsToCluster(localCluster, topic, numRecords);
        this.waitForMirroring(remoteCluster, (Seq<TopicPartition>)partitions);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopic(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        String topic = "topic";
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopic$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(topic, this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, topic, numRecords);
        this.configureMirrorTransitionBatchSize((scala.collection.immutable.Seq<ClusterLinkTestHarness>)new .colon.colon((Object)localCluster, (List)new .colon.colon((Object)remoteCluster, (List)Nil$.MODULE$)), Integer.MAX_VALUE);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(topic, this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilPendingMirrorState(topic, this.linkName(), remoteCluster.waitUntilPendingMirrorState$default$3());
        localCluster.pauseTopic(topic, localCluster.pauseTopic$default$2());
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        localCluster.pauseTopic(topic, false);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        this.produceRecordsToCluster(localCluster, topic, numRecords);
        this.waitForMirroring(remoteCluster, (Seq<TopicPartition>)partitions);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopicWithLocalClusterRestart(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        String topic = "topic";
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopicWithLocalClusterRestart$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(topic, this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, topic, numRecords);
        this.configureMirrorTransitionBatchSize((scala.collection.immutable.Seq<ClusterLinkTestHarness>)new .colon.colon((Object)localCluster, (List)new .colon.colon((Object)remoteCluster, (List)Nil$.MODULE$)), Integer.MAX_VALUE);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(topic, this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        localCluster.pauseTopic(topic, localCluster.pauseTopic$default$2());
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        localCluster.killAllBrokers();
        localCluster.restartDeadBrokers(localCluster.restartDeadBrokers$default$1());
        localCluster.updateBootstrapServers();
        remoteCluster.alterClusterLink(this.linkName(), (Map<String, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)localCluster.bootstrapServers(localCluster.bootstrapServers$default$1()))}))), remoteCluster.alterClusterLink$default$3(), remoteCluster.alterClusterLink$default$4(), remoteCluster.alterClusterLink$default$5());
        localCluster.pauseTopic(topic, false);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        this.produceRecordsToCluster(localCluster, topic, numRecords);
        this.waitForMirroring(remoteCluster, (Seq<TopicPartition>)partitions);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndStartMirrorWithDeletedTopics(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirrorWithDeletedTopics$1(this, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, this.topic(), numRecords);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(this.topic(), this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(this.topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        remoteCluster.deleteTopic(this.topic(), true);
        remoteCluster.createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, this.topic(), this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        localCluster.deleteTopic(this.topic(), true);
        localCluster.linkTopic(this.topic(), this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        localCluster.alterMirrors(this.topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        remoteCluster.waitUntilPendingMirrorState(this.topic(), this.linkName(), remoteCluster.waitUntilPendingMirrorState$default$3());
        localCluster.deleteTopic(this.topic(), true);
        localCluster.createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), localCluster.createTopic$default$4(), localCluster.createTopic$default$5(), localCluster.createTopic$default$6());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, this.topic(), this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testRollbackMirrorFromPendingSynchronizeState(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testRollbackMirrorFromPendingSynchronizeState$1(this, BoxesRunTime.unboxToInt((Object)i)));
        KafkaProducer remoteProducer = remoteCluster.createProducer(remoteCluster.createProducer$default$1(), remoteCluster.createProducer$default$2(), remoteCluster.createProducer$default$3());
        remoteCluster.createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecords(remoteProducer, this.topic(), numRecords, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(this.topic(), this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(this.topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        remoteCluster.killAllBrokers();
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, this.topic(), this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        localCluster.alterMirrors(this.topic(), AlterMirrorOp.ROLLBACK);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, this.topic(), this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.restartDeadBrokers(remoteCluster.restartDeadBrokers$default$1());
        remoteCluster.updateBootstrapServers();
        remoteCluster.waitUntilPendingMirrorState(this.topic(), this.linkName(), remoteCluster.waitUntilPendingMirrorState$default$3());
        String x$1 = this.topic();
        String x$2 = this.linkName();
        boolean x$4 = remoteCluster.unlinkTopic$default$3();
        boolean x$5 = remoteCluster.unlinkTopic$default$5();
        int x$6 = remoteCluster.unlinkTopic$default$6();
        remoteCluster.unlinkTopic(x$1, x$2, x$4, false, x$5, x$6);
        this.produceRecordsToCluster(remoteCluster, this.topic(), numRecords);
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testRollbackMirrorFailsOnPendingMirror(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testRollbackMirrorFailsOnPendingMirror$1(this, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, this.topic(), numRecords);
        this.configureMirrorTransitionBatchSize((scala.collection.immutable.Seq<ClusterLinkTestHarness>)new .colon.colon((Object)localCluster, (List)new .colon.colon((Object)remoteCluster, (List)Nil$.MODULE$)), Integer.MAX_VALUE);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(this.topic(), this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(this.topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        remoteCluster.waitUntilPendingMirrorState(this.topic(), this.linkName(), remoteCluster.waitUntilPendingMirrorState$default$3());
        localCluster.killAllBrokers();
        remoteCluster.waitUntilPendingMirrorState(this.topic(), this.linkName(), remoteCluster.waitUntilPendingMirrorState$default$3());
        Assertions.assertThrows(InvalidRequestException.class, () -> remoteCluster.alterMirrors(this.topic(), AlterMirrorOp.ROLLBACK));
        String x$1 = this.topic();
        String x$2 = this.linkName();
        boolean x$4 = remoteCluster.unlinkTopic$default$3();
        boolean x$5 = remoteCluster.unlinkTopic$default$5();
        int x$6 = remoteCluster.unlinkTopic$default$6();
        remoteCluster.unlinkTopic(x$1, x$2, x$4, false, x$5, x$6);
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, this.topic(), this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testPauseMirrorFailsOnPendingMirror(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testPauseMirrorFailsOnPendingMirror$1(this, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, this.topic(), numRecords);
        this.configureMirrorTransitionBatchSize((scala.collection.immutable.Seq<ClusterLinkTestHarness>)new .colon.colon((Object)localCluster, (List)new .colon.colon((Object)remoteCluster, (List)Nil$.MODULE$)), 2);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Uuid linkId = this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(this.topic(), this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(this.topic(), AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR);
        remoteCluster.waitUntilPendingMirrorState(this.topic(), this.linkName(), remoteCluster.waitUntilPendingMirrorState$default$3());
        localCluster.killAllBrokers();
        remoteCluster.waitUntilPendingMirrorState(this.topic(), this.linkName(), remoteCluster.waitUntilPendingMirrorState$default$3());
        this.assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", (Seq<TaskErrorCode>)new .colon.colon((Object)InternalTaskErrorCode$.MODULE$, (List)Nil$.MODULE$), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)remoteCluster.linkCoordinator(this.linkName()), (List)Nil$.MODULE$), remoteCluster.nonLinkCoordinators(this.linkName()), 120000L);
        if (coordinator) {
            this.verifyCoordinatorChangeHandlesStoppingAndStartingTasks(remoteCluster, linkId, this.topic(), (TaskType)ClusterLinkConvertToMirrorTopicTaskType$.MODULE$, true, true);
        }
        long l = 15000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l2 = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                BidirectionalLinkIntegrationTest.$anonfun$testPauseMirrorFailsOnPendingMirror$2(this, remoteCluster);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l2).append(", and then retrying.").toString();
                    Object var22_17 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l2);
                l2 += package$.MODULE$.min(l2, 1000L);
                continue;
            }
            break;
        }
        Object var16_12 = null;
        Object var21_16 = null;
        Assertions.assertThrows(InvalidRequestException.class, () -> remoteCluster.alterMirrors(this.topic(), AlterMirrorOp.PAUSE));
        String x$1 = this.topic();
        String x$2 = this.linkName();
        boolean x$4 = remoteCluster.unlinkTopic$default$3();
        boolean x$5 = remoteCluster.unlinkTopic$default$5();
        int x$6 = remoteCluster.unlinkTopic$default$6();
        remoteCluster.unlinkTopic(x$1, x$2, x$4, false, x$5, x$6);
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, this.topic(), this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testReverseAndPauseMirror(String quorum, boolean coordinator, String localReplication) {
        int numRecords = 20;
        ClusterLinkTestHarness localCluster = this.destCluster();
        ClusterLinkTestHarness remoteCluster = this.sourceCluster();
        String topic = "topic";
        IndexedSeq partitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testReverseAndPauseMirror$1(topic, BoxesRunTime.unboxToInt((Object)i)));
        remoteCluster.createTopic(topic, this.numPartitions(), this.replicationFactor(), remoteCluster.createTopic$default$4(), remoteCluster.createTopic$default$5(), remoteCluster.createTopic$default$6());
        this.produceRecordsToCluster(remoteCluster, topic, numRecords);
        Properties localLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Inbound$.MODULE$, remoteCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        Properties remoteLinkProps = this.linkProps((ConnectionMode)ConnectionMode.Outbound$.MODULE$, localCluster, this.linkProps$default$3(), this.linkProps$default$4(), this.linkProps$default$5());
        this.createBidirectionalLink(this.linkName(), localCluster, remoteCluster, localLinkProps, remoteLinkProps);
        localCluster.linkTopic(topic, this.replicationFactor(), this.linkName(), localCluster.linkTopic$default$4(), localCluster.linkTopic$default$5());
        this.waitForMirroring(localCluster, (Seq<TopicPartition>)partitions);
        localCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        this.assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", (Seq<TaskErrorCode>)new .colon.colon((Object)NoErrorCode$.MODULE$, (List)Nil$.MODULE$), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)localCluster.linkCoordinator(this.linkName()), (List)Nil$.MODULE$), localCluster.nonLinkCoordinators(this.linkName()), this.assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        this.assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", (Seq<TaskErrorCode>)new .colon.colon((Object)NoErrorCode$.MODULE$, (List)Nil$.MODULE$), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)remoteCluster.linkCoordinator(this.linkName()), (List)Nil$.MODULE$), remoteCluster.nonLinkCoordinators(this.linkName()), this.assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        remoteCluster.pauseTopic(topic, false);
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        this.produceRecordsToCluster(localCluster, topic, numRecords);
        this.waitForMirroring(remoteCluster, (Seq<TopicPartition>)partitions);
        remoteCluster.alterMirrors(topic, AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR);
        remoteCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, this.linkName(), remoteCluster.waitUntilMirrorDescriptionState$default$4());
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
        this.assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", (Seq<TaskErrorCode>)new .colon.colon((Object)NoErrorCode$.MODULE$, (List)Nil$.MODULE$), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)localCluster.linkCoordinator(this.linkName()), (List)Nil$.MODULE$), localCluster.nonLinkCoordinators(this.linkName()), this.assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        this.assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", (Seq<TaskErrorCode>)new .colon.colon((Object)NoErrorCode$.MODULE$, (List)Nil$.MODULE$), 1.0, (Seq<KafkaBroker>)new .colon.colon((Object)localCluster.linkCoordinator(this.linkName()), (List)Nil$.MODULE$), localCluster.nonLinkCoordinators(this.linkName()), this.assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        localCluster.pauseTopic(topic, false);
        localCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, this.linkName(), localCluster.waitUntilMirrorDescriptionState$default$4());
    }

    private void produceRecordsToCluster(ClusterLinkTestHarness cluster, String topic, int numRecords) {
        KafkaProducer producer = cluster.createProducer(cluster.createProducer$default$1(), cluster.createProducer$default$2(), cluster.createProducer$default$3());
        this.produceRecords(producer, topic, numRecords, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        producer.close();
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"quorumCoordinatorReplicationCombinations"})
    public void testMultipleMirrorStateTransitions(String quorum, boolean coordinator, String localReplication) {
        void var38_38;
        void var37_37;
        AlterMirrorsOptions alterMirrorOptions = (AlterMirrorsOptions)new AlterMirrorsOptions().timeoutMs(Predef$.MODULE$.int2Integer(20000));
        ObjectRef primaryCluster = ObjectRef.create((Object)this.sourceCluster());
        ObjectRef secondaryCluster = ObjectRef.create((Object)this.destCluster());
        Object[] objectArray = new Tuple2[2];
        ClusterLinkTestHarness qual$1 = (ClusterLinkTestHarness)primaryCluster.elem;
        Properties x$1 = qual$1.createConfluentAdminClient$default$1();
        objectArray[0] = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)((ClusterLinkTestHarness)primaryCluster.elem)), (Object)qual$1.createConfluentAdminClient(x$1));
        ClusterLinkTestHarness qual$2 = (ClusterLinkTestHarness)secondaryCluster.elem;
        Properties x$2 = qual$2.createConfluentAdminClient$default$1();
        objectArray[1] = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)((ClusterLinkTestHarness)secondaryCluster.elem)), (Object)qual$2.createConfluentAdminClient(x$2));
        Map admins = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray(objectArray));
        ((ClusterLinkTestHarness)primaryCluster.elem).producerConfig().setProperty("enable.idempotence", "false");
        ((ClusterLinkTestHarness)secondaryCluster.elem).producerConfig().setProperty("enable.idempotence", "false");
        Map linkPropOverrides = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), (Object)"true"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), (Object)this.consumerGroupFilter("*"))}));
        ConnectionMode.Outbound$ x$32 = ConnectionMode.Outbound$.MODULE$;
        ClusterLinkTestHarness x$4 = (ClusterLinkTestHarness)primaryCluster.elem;
        Option<String> x$62 = this.linkProps$default$3();
        Option<scala.collection.immutable.Seq<Enumeration.Value>> x$72 = this.linkProps$default$4();
        Properties eastLinkProps = this.linkProps((ConnectionMode)x$32, x$4, x$62, x$72, (Map<String, String>)linkPropOverrides);
        ConnectionMode.Outbound$ x$82 = ConnectionMode.Outbound$.MODULE$;
        ClusterLinkTestHarness x$92 = (ClusterLinkTestHarness)secondaryCluster.elem;
        Option<String> x$11 = this.linkProps$default$3();
        Option<scala.collection.immutable.Seq<Enumeration.Value>> x$12 = this.linkProps$default$4();
        Properties westLinkProps = this.linkProps((ConnectionMode)x$82, x$92, x$11, x$12, (Map<String, String>)linkPropOverrides);
        this.createBidirectionalLink(this.linkName(), (ClusterLinkTestHarness)secondaryCluster.elem, (ClusterLinkTestHarness)primaryCluster.elem, eastLinkProps, westLinkProps);
        IndexedSeq topics = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 6).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testMultipleMirrorStateTransitions$4(BoxesRunTime.unboxToInt((Object)i)));
        topics.foreach((Function1 & Serializable)topic -> {
            BidirectionalLinkIntegrationTest.$anonfun$testMultipleMirrorStateTransitions$5(this, primaryCluster, secondaryCluster, topic);
            return BoxedUnit.UNIT;
        });
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 4).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)x$3 -> this.reverseMirroring$1((scala.collection.immutable.Seq)topics, admins, secondaryCluster, alterMirrorOptions, primaryCluster));
        ClusterLinkTestHarness qual$6 = (ClusterLinkTestHarness)primaryCluster.elem;
        ByteArrayDeserializer x$27 = qual$6.createConsumer$default$1();
        ByteArrayDeserializer x$28 = qual$6.createConsumer$default$2();
        Properties x$29 = qual$6.createConsumer$default$3();
        List<String> x$30 = qual$6.createConsumer$default$4();
        Consumer primaryConsumer = qual$6.createConsumer(x$27, x$28, x$29, x$30);
        ClusterLinkTestHarness qual$7 = (ClusterLinkTestHarness)secondaryCluster.elem;
        ByteArrayDeserializer x$31 = qual$7.createConsumer$default$1();
        ByteArrayDeserializer x$322 = qual$7.createConsumer$default$2();
        Properties x$33 = qual$7.createConsumer$default$3();
        List<String> x$34 = qual$7.createConsumer$default$4();
        Consumer secondaryConsumer = qual$7.createConsumer(x$31, x$322, x$33, x$34);
        this.createConsumerAndCommitOffsets$1(primaryConsumer, topics);
        this.createConsumerAndCommitOffsets$1(secondaryConsumer, topics);
        Tuple2 tuple2 = topics.splitAt(3);
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        IndexedSeq promoteTopics = (IndexedSeq)tuple2._1();
        IndexedSeq failoverTopics = (IndexedSeq)tuple2._2();
        ((ConfluentAdmin)admins.apply((Object)((ClusterLinkTestHarness)secondaryCluster.elem))).alterMirrors(CollectionConverters$.MODULE$.MapHasAsJava((Map)((IterableOnceOps)var37_37.map((Function1 & Serializable)x$6 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$6), (Object)AlterMirrorOp.PROMOTE))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava(), alterMirrorOptions);
        ((ConfluentAdmin)admins.apply((Object)((ClusterLinkTestHarness)secondaryCluster.elem))).alterMirrors(CollectionConverters$.MODULE$.MapHasAsJava((Map)((IterableOnceOps)var38_38.map((Function1 & Serializable)x$7 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$7), (Object)AlterMirrorOp.FAILOVER))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava(), alterMirrorOptions);
        var37_37.foreach((Function1 & Serializable)x$8 -> {
            BidirectionalLinkIntegrationTest.$anonfun$testMultipleMirrorStateTransitions$11(this, secondaryCluster, x$8);
            return BoxedUnit.UNIT;
        });
        var38_38.foreach((Function1 & Serializable)x$9 -> {
            BidirectionalLinkIntegrationTest.$anonfun$testMultipleMirrorStateTransitions$12(this, secondaryCluster, x$9);
            return BoxedUnit.UNIT;
        });
        secondaryConsumer.close();
        var37_37.foreach((Function1 & Serializable)x$10 -> {
            BidirectionalLinkIntegrationTest.$anonfun$testMultipleMirrorStateTransitions$13(this, secondaryCluster, x$10);
            return BoxedUnit.UNIT;
        });
        scala.collection.mutable.Map committedOffsets = (scala.collection.mutable.Map)CollectionConverters$.MODULE$.MapHasAsScala((java.util.Map)((java.util.Map)((Admin)admins.apply((Object)((ClusterLinkTestHarness)secondaryCluster.elem))).listConsumerGroupOffsets("group").all().get(15L, TimeUnit.SECONDS)).get("group")).asScala().map((Function1 & Serializable)e -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(e._1()), (Object)BoxesRunTime.boxToLong((long)((OffsetAndMetadata)e._2()).offset())));
        Assertions.assertEquals((Object)((scala.collection.immutable.Map)((IterableOnceOps)((IndexedSeqOps)var37_37.flatMap((Function1 & Serializable)topic -> this.partitions$2((String)topic))).map((Function1 & Serializable)tp -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp), (Object)BoxesRunTime.boxToLong((long)((ClusterLinkTestHarness)secondaryCluster$1.elem).leaderLog((TopicPartition)tp).logEndOffset())))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()).$plus$plus((IterableOnce)((IterableOnceOps)((IndexedSeqOps)var38_38.flatMap((Function1 & Serializable)topic -> this.partitions$2((String)topic))).map((Function1 & Serializable)tp -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp), (Object)BoxesRunTime.boxToLong((long)200L)))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()))), (Object)committedOffsets);
    }

    private void verifyBidirectionalLink(ConnectionMode eastLinkConnectionMode, ConnectionMode westLinkConnectionMode, Option<scala.collection.immutable.Seq<Enumeration.Value>> topicTypes, Map<String, String> configOverrides) {
        boolean configuredToNotSyncRemoteMirrors;
        Assumptions.assumeTrue((boolean)this.clusterLinkPrefix().isEmpty());
        int numRecords = 20;
        ClusterLinkTestHarness eastCluster = this.destCluster();
        ClusterLinkTestHarness westCluster = this.sourceCluster();
        String eastTopic = "east.topic";
        String westTopic = "west.topic";
        String eastGroup = "east.group";
        String westGroup = "west.group";
        KafkaProducer eastProducer = eastCluster.createProducer(eastCluster.createProducer$default$1(), eastCluster.createProducer$default$2(), eastCluster.createProducer$default$3());
        KafkaProducer westProducer = westCluster.createProducer(westCluster.createProducer$default$1(), westCluster.createProducer$default$2(), westCluster.createProducer$default$3());
        westCluster.createTopic(westTopic, this.numPartitions(), this.replicationFactor(), westCluster.createTopic$default$4(), westCluster.createTopic$default$5(), westCluster.createTopic$default$6());
        eastCluster.createTopic(eastTopic, this.numPartitions(), this.replicationFactor(), eastCluster.createTopic$default$4(), eastCluster.createTopic$default$5(), eastCluster.createTopic$default$6());
        this.produceRecords(westProducer, westTopic, numRecords, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        this.produceRecords(eastProducer, eastTopic, numRecords, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        Properties eastLinkProps = this.linkProps(eastLinkConnectionMode, westCluster, (Option<String>)new Some((Object)westGroup), topicTypes, configOverrides);
        Properties westLinkProps = this.linkProps(westLinkConnectionMode, eastCluster, (Option<String>)new Some((Object)eastGroup), topicTypes, configOverrides);
        Uuid linkId = this.createBidirectionalLink(this.linkName(), eastCluster, westCluster, eastLinkProps, westLinkProps);
        if (!configOverrides.get((Object)ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()).contains((Object)"true")) {
            eastCluster.linkTopic(westTopic, this.replicationFactor(), this.linkName(), eastCluster.linkTopic$default$4(), eastCluster.linkTopic$default$5());
            westCluster.linkTopic(eastTopic, this.replicationFactor(), this.linkName(), westCluster.linkTopic$default$4(), westCluster.linkTopic$default$5());
        }
        IndexedSeq westPartitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$verifyBidirectionalLink$1(westTopic, BoxesRunTime.unboxToInt((Object)i)));
        IndexedSeq eastPartitions = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$verifyBidirectionalLink$2(eastTopic, BoxesRunTime.unboxToInt((Object)i)));
        this.waitForMirroring(eastCluster, (Seq<TopicPartition>)westPartitions);
        this.waitForMirroring(westCluster, (Seq<TopicPartition>)eastPartitions);
        this.produceRecords(westProducer, westTopic, numRecords, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        this.produceRecords(eastProducer, eastTopic, numRecords, this.produceRecords$default$4(), this.produceRecords$default$5(), this.produceRecords$default$6());
        this.waitForMirroring(eastCluster, (Seq<TopicPartition>)westPartitions);
        this.waitForMirroring(westCluster, (Seq<TopicPartition>)eastPartitions);
        long offset = this.nextOffset(new TopicPartition(eastTopic, 0));
        this.commitOffsets(eastCluster, eastTopic, 0, offset, eastGroup);
        this.commitOffsets(eastCluster, westTopic, 0, offset, eastGroup);
        this.commitOffsets(westCluster, westTopic, 0, offset, westGroup);
        this.commitOffsets(westCluster, eastTopic, 0, offset, westGroup);
        this.verifyOffsetMigration(westTopic, 0, offset, westGroup, eastCluster);
        boolean bl = configuredToNotSyncRemoteMirrors = topicTypes.isDefined() && !((SeqOps)topicTypes.get()).contains((Object)TopicType$.MODULE$.REMOTE_MIRROR());
        if (configuredToNotSyncRemoteMirrors) {
            this.verifyOffsetMigration(eastTopic, 0, 0L, westGroup, eastCluster);
        } else {
            this.verifyOffsetMigration(eastTopic, 0, offset, westGroup, eastCluster);
        }
        this.verifyOffsetMigration(eastTopic, 0, offset, eastGroup, westCluster);
        if (configuredToNotSyncRemoteMirrors) {
            this.verifyOffsetMigration(westTopic, 0, 0L, eastGroup, westCluster);
        } else {
            this.verifyOffsetMigration(westTopic, 0, offset, eastGroup, westCluster);
        }
        this.verifyLinkMetrics(linkId, eastCluster, westCluster, eastLinkProps, westLinkProps, eastTopic, westTopic);
        eastCluster.unlinkTopic(westTopic, this.linkName(), eastCluster.unlinkTopic$default$3(), eastCluster.unlinkTopic$default$4(), eastCluster.unlinkTopic$default$5(), eastCluster.unlinkTopic$default$6());
        westCluster.unlinkTopic(eastTopic, this.linkName(), westCluster.unlinkTopic$default$3(), westCluster.unlinkTopic$default$4(), westCluster.unlinkTopic$default$5(), westCluster.unlinkTopic$default$6());
        this.verifyMirror(eastCluster, (scala.collection.immutable.Seq<TopicPartition>)westPartitions);
        this.verifyMirror(westCluster, (scala.collection.immutable.Seq<TopicPartition>)eastPartitions);
        eastCluster.deleteClusterLink(this.linkName(), eastCluster.deleteClusterLink$default$2(), eastCluster.deleteClusterLink$default$3());
        westCluster.deleteClusterLink(this.linkName(), westCluster.deleteClusterLink$default$2(), westCluster.deleteClusterLink$default$3());
    }

    private Option<scala.collection.immutable.Seq<Enumeration.Value>> verifyBidirectionalLink$default$3() {
        return None$.MODULE$;
    }

    private Map<String, String> verifyBidirectionalLink$default$4() {
        return (Map)Map$.MODULE$.empty();
    }

    private void configureMirrorTransitionBatchSize(scala.collection.immutable.Seq<ClusterLinkTestHarness> clusters, int batchSize) {
        clusters.foreach((Function1 & Serializable)cluster -> {
            cluster.alterBrokerConfig((Option<String>)None$.MODULE$, "confluent.cluster.link.mirror.transition.batch.size", Integer.toString(batchSize));
            return BoxedUnit.UNIT;
        });
    }

    public Properties linkProps(ConnectionMode connectionMode, ClusterLinkTestHarness remoteCluster, Option<String> consumerGroup, Option<scala.collection.immutable.Seq<Enumeration.Value>> topicTypes, Map<String, String> configOverrides) {
        Properties props = new Properties();
        ConnectionMode connectionMode2 = connectionMode;
        ConnectionMode.Outbound$ outbound$ = ConnectionMode.Outbound$.MODULE$;
        if (connectionMode2 != null && connectionMode2.equals(outbound$)) {
            String linkJaasConfig = this.createLinkCredentials(this.linkName(), remoteCluster, this.createLinkCredentials$default$3());
            props.put("bootstrap.servers", remoteCluster.bootstrapServers(remoteCluster.bootstrapServers$default$1()));
            new Implicits.PropertiesOps(props).$plus$plus$eq(remoteCluster.clientSecurityProps(this.linkName()));
            props.put("sasl.jaas.config", linkJaasConfig);
        }
        props.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "10000");
        props.put("metadata.max.age.ms", "2000");
        props.put("reconnect.backoff.max.ms", "1000");
        props.setProperty("request.timeout.ms", "1000");
        props.setProperty("default.api.timeout.ms", "1000");
        props.setProperty(ClusterLinkConfig$.MODULE$.LinkModeProp(), ClusterLinkConfig.LinkMode.BIDIRECTIONAL.name());
        props.setProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        consumerGroup.foreach((Function1 & Serializable)group -> {
            String string;
            props.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
            if (topicTypes instanceof Some) {
                scala.collection.immutable.Seq tt = (scala.collection.immutable.Seq)((Some)topicTypes).value();
                string = this.consumerGroupFilter((String)group, (Seq<Enumeration.Value>)tt);
            } else if (None$.MODULE$.equals(topicTypes)) {
                string = this.consumerGroupFilter((String)group);
            } else {
                throw new MatchError((Object)topicTypes);
            }
            String groupFilter = string;
            props.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), groupFilter);
            return props.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        });
        new Implicits.PropertiesOps(props).$plus$plus$eq(configOverrides);
        return props;
    }

    public Option<String> linkProps$default$3() {
        return None$.MODULE$;
    }

    public Option<scala.collection.immutable.Seq<Enumeration.Value>> linkProps$default$4() {
        return None$.MODULE$;
    }

    public Map<String, String> linkProps$default$5() {
        return (Map)Map$.MODULE$.empty();
    }

    private void verifyMirror(ClusterLinkTestHarness cluster, scala.collection.immutable.Seq<TopicPartition> partitions) {
        Consumer consumer = cluster.createConsumer(cluster.createConsumer$default$1(), cluster.createConsumer$default$2(), cluster.createConsumer$default$3(), cluster.createConsumer$default$4());
        consumer.assign((Collection)CollectionConverters$.MODULE$.SeqHasAsJava(partitions).asJava());
        this.consumePartitionRecords(consumer, (Set<TopicPartition>)partitions.toSet(), this.clusterLinkPrefix(), ((TopicPartition)partitions.head()).topic(), this.consumePartitionRecords$default$5());
        consumer.close();
    }

    private void verifyLinkMetrics(Uuid linkId, ClusterLinkTestHarness eastCluster, ClusterLinkTestHarness westCluster, Properties eastLinkProps, Properties westLinkProps, String eastTopic, String westTopic) {
        block5: {
            ConnectionMode eastConnectionMode;
            block4: {
                new .colon.colon((Object)eastCluster, (List)new .colon.colon((Object)westCluster, (List)Nil$.MODULE$)).foreach((Function1 & Serializable)cluster -> {
                    BidirectionalLinkIntegrationTest.$anonfun$verifyLinkMetrics$1(this, cluster);
                    return BoxedUnit.UNIT;
                });
                this.destCluster_$eq(eastCluster);
                this.sourceCluster_$eq(westCluster);
                this.verifyDestinationLinkMetrics(linkId, eastLinkProps, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, westTopic);
                this.sourceCluster_$eq(eastCluster);
                this.destCluster_$eq(westCluster);
                this.verifyDestinationLinkMetrics(linkId, westLinkProps, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, eastTopic);
                eastConnectionMode = ConnectionMode$.MODULE$.fromString(eastLinkProps.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
                ConnectionMode westConnectionMode = ConnectionMode$.MODULE$.fromString(westLinkProps.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
                ConnectionMode connectionMode = eastConnectionMode;
                ConnectionMode.Inbound$ inbound$ = ConnectionMode.Inbound$.MODULE$;
                if (connectionMode != null && connectionMode.equals(inbound$)) break block4;
                ConnectionMode connectionMode2 = westConnectionMode;
                ConnectionMode.Inbound$ inbound$2 = ConnectionMode.Inbound$.MODULE$;
                if (connectionMode2 == null) {
                    return;
                }
                if (!connectionMode2.equals(inbound$2)) break block5;
            }
            ConnectionMode connectionMode = eastConnectionMode;
            ConnectionMode.Outbound$ outbound$ = ConnectionMode.Outbound$.MODULE$;
            ConnectionMode connectionMode3 = eastConnectionMode;
            ConnectionMode.Inbound$ inbound$ = ConnectionMode.Inbound$.MODULE$;
            this.verifyReverseConnectionMetrics(this.linkName(), ClusterLinkConfig.LinkMode.BIDIRECTIONAL, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, connectionMode != null && connectionMode.equals(outbound$) ? eastCluster : westCluster, connectionMode3 != null && connectionMode3.equals(inbound$) ? eastCluster : westCluster);
            return;
        }
    }

    public Uuid createBidirectionalLink(String linkName, ClusterLinkTestHarness eastCluster, ClusterLinkTestHarness westCluster, Properties eastLinkProps, Properties westLinkProps) {
        Uuid linkId = eastCluster.createClusterLink(linkName, eastLinkProps, (Option<String>)new Some((Object)((KafkaBroker)westCluster.brokers().head()).clusterId()), true);
        Uuid secondLinkId = westCluster.createClusterLinkWithAllOptions(linkName, westLinkProps, (Option<String>)new Some((Object)((KafkaBroker)eastCluster.brokers().head()).clusterId()), true, (Option<Uuid>)new Some((Object)linkId), westCluster.createClusterLinkWithAllOptions$default$6());
        Assertions.assertEquals((Object)linkId, (Object)secondLinkId);
        return linkId;
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirror$1(String topic$1, int i) {
        return new TopicPartition(topic$1, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$1(String topic$2, int i) {
        return new TopicPartition(topic$2, i);
    }

    private static final void bumpLeaderEpochs$1(ClusterLinkTestHarness cluster, IntRef atLeastExpectedLeaderEpoch$1, IndexedSeq partitions$1) {
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), atLeastExpectedLeaderEpoch$1.elem).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)x$1 -> partitions.foreach((Function1 & Serializable)tp -> BoxesRunTime.boxToInteger((int)localCluster.changeLeader(tp))));
    }

    public static final /* synthetic */ boolean $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$5(ClusterLinkTestHarness cluster$2, TopicPartition p$1, IntRef atLeastExpectedLeaderEpoch$1) {
        return cluster$2.replicaStatusWithPartitionResult(p$1.topic(), p$1.partition()).leaderEpoch().orElseGet(() -> 0) >= atLeastExpectedLeaderEpoch$1.elem;
    }

    public static final /* synthetic */ String $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$7() {
        return "Leader epoch did not get bumped";
    }

    /*
     * WARNING - void declaration
     */
    public static final /* synthetic */ void $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(ClusterLinkTestHarness cluster$2, IntRef atLeastExpectedLeaderEpoch$1, TopicPartition p) {
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!(cluster$2.replicaStatusWithPartitionResult(p.topic(), p.partition()).leaderEpoch().orElseGet(() -> 0) >= atLeastExpectedLeaderEpoch$1.elem)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Leader epoch did not get bumped");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private static final void verifyLeaderEpochIsBumped$1(ClusterLinkTestHarness cluster, IndexedSeq partitions$1, IntRef atLeastExpectedLeaderEpoch$1) {
        partitions$1.foreach((Function1 & Serializable)p -> {
            BidirectionalLinkIntegrationTest.$anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(localCluster, atLeastExpectedLeaderEpoch, p);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithLag$1(String topic$3, int i) {
        return new TopicPartition(topic$3, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithRemoteClusterRestart$1(String topic$4, int i) {
        return new TopicPartition(topic$4, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithLocalClusterRestart$1(String topic$5, int i) {
        return new TopicPartition(topic$5, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLinks$1(String topic$6, int i) {
        return new TopicPartition(topic$6, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopic$1(String topic$7, int i) {
        return new TopicPartition(topic$7, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopicWithLocalClusterRestart$1(String topic$8, int i) {
        return new TopicPartition(topic$8, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithDeletedTopics$1(BidirectionalLinkIntegrationTest $this, int i) {
        return new TopicPartition($this.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testRollbackMirrorFromPendingSynchronizeState$1(BidirectionalLinkIntegrationTest $this, int i) {
        return new TopicPartition($this.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testRollbackMirrorFailsOnPendingMirror$1(BidirectionalLinkIntegrationTest $this, int i) {
        return new TopicPartition($this.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testPauseMirrorFailsOnPendingMirror$1(BidirectionalLinkIntegrationTest $this, int i) {
        return new TopicPartition($this.topic(), i);
    }

    public static final /* synthetic */ void $anonfun$testPauseMirrorFailsOnPendingMirror$2(BidirectionalLinkIntegrationTest $this, ClusterLinkTestHarness remoteCluster$2) {
        MirrorTopicDescription descWithTransitionErrors = remoteCluster$2.describeMirrorTopic($this.topic(), true);
        Assertions.assertNotNull((Object)descWithTransitionErrors);
        java.util.List transitionErrors = descWithTransitionErrors.mirrorStateTransitionErrors();
        Assertions.assertEquals((int)1, (int)transitionErrors.size());
        Assertions.assertEquals((Object)ClusterLinkTaskError.ClusterLinkTaskErrorCode.INTERNAL_ERROR, (Object)((ClusterLinkTaskError)transitionErrors.get(0)).errorCode());
        MirrorTopicDescription descWithoutTransitionErrors = remoteCluster$2.describeMirrorTopic($this.topic(), remoteCluster$2.describeMirrorTopic$default$2());
        Assertions.assertNotNull((Object)descWithoutTransitionErrors);
        descWithoutTransitionErrors.mirrorStateTransitionErrors();
        Assertions.assertEquals((int)0, (int)descWithoutTransitionErrors.mirrorStateTransitionErrors().size());
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndPauseMirror$1(String topic$9, int i) {
        return new TopicPartition(topic$9, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testMultipleMirrorStateTransitions$1(String topic$10, int i) {
        return new TopicPartition(topic$10, i);
    }

    private final scala.collection.immutable.Seq partitions$2(String topic) {
        return RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numPartitions()).map((Function1 & Serializable)i -> BidirectionalLinkIntegrationTest.$anonfun$testMultipleMirrorStateTransitions$1(topic, BoxesRunTime.unboxToInt((Object)i)));
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$3(BidirectionalLinkIntegrationTest $this, ObjectRef primaryCluster$1, ObjectRef secondaryCluster$1, String topic) {
        ClusterLinkTestHarness qual$1 = (ClusterLinkTestHarness)primaryCluster$1.elem;
        String x$3 = $this.linkName();
        String x$4 = qual$1.waitUntilMirrorDescriptionState$default$4();
        qual$1.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic, x$3, x$4);
        ClusterLinkTestHarness qual$2 = (ClusterLinkTestHarness)secondaryCluster$1.elem;
        String x$7 = $this.linkName();
        String x$8 = qual$2.waitUntilMirrorDescriptionState$default$4();
        qual$2.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic, x$7, x$8);
        ClusterLinkTestHarness qual$3 = (ClusterLinkTestHarness)primaryCluster$1.elem;
        ByteArraySerializer x$9 = qual$3.createProducer$default$1();
        ByteArraySerializer x$10 = qual$3.createProducer$default$2();
        Properties x$11 = qual$3.createProducer$default$3();
        $this.produceRecords(qual$3.createProducer(x$9, x$10, x$11), topic, 20, $this.produceRecords$default$4(), $this.produceRecords$default$5(), $this.produceRecords$default$6());
        $this.waitForMirroring((ClusterLinkTestHarness)secondaryCluster$1.elem, (Seq<TopicPartition>)$this.partitions$2(topic));
    }

    private final void reverseMirroring$1(scala.collection.immutable.Seq topics, Map admins$1, ObjectRef secondaryCluster$1, AlterMirrorsOptions alterMirrorOptions$1, ObjectRef primaryCluster$1) {
        ((ConfluentAdmin)admins$1.apply((Object)((ClusterLinkTestHarness)secondaryCluster$1.elem))).alterMirrors(CollectionConverters$.MODULE$.MapHasAsJava((Map)((IterableOnceOps)topics.map((Function1 & Serializable)x$2 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$2), (Object)AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava(), alterMirrorOptions$1);
        ClusterLinkTestHarness oldPrimary = (ClusterLinkTestHarness)primaryCluster$1.elem;
        primaryCluster$1.elem = (ClusterLinkTestHarness)secondaryCluster$1.elem;
        secondaryCluster$1.elem = oldPrimary;
        topics.foreach((Function1 & Serializable)topic -> {
            BidirectionalLinkIntegrationTest.$anonfun$testMultipleMirrorStateTransitions$3(this, primaryCluster$1, secondaryCluster$1, topic);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ String $anonfun$testMultipleMirrorStateTransitions$4(int i) {
        return new StringBuilder(5).append("topic").append(i).toString();
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$5(BidirectionalLinkIntegrationTest $this, ObjectRef primaryCluster$1, ObjectRef secondaryCluster$1, String topic) {
        ClusterLinkTestHarness qual$3 = (ClusterLinkTestHarness)primaryCluster$1.elem;
        int x$14 = $this.numPartitions();
        short x$15 = $this.replicationFactor();
        Properties x$16 = qual$3.createTopic$default$4();
        ListenerName x$17 = qual$3.createTopic$default$5();
        Properties x$18 = qual$3.createTopic$default$6();
        qual$3.createTopic(topic, x$14, x$15, x$16, x$17, x$18);
        ClusterLinkTestHarness qual$4 = (ClusterLinkTestHarness)primaryCluster$1.elem;
        ByteArraySerializer x$19 = qual$4.createProducer$default$1();
        ByteArraySerializer x$20 = qual$4.createProducer$default$2();
        Properties x$21 = qual$4.createProducer$default$3();
        $this.produceRecords(qual$4.createProducer(x$19, x$20, x$21), topic, 20, $this.produceRecords$default$4(), $this.produceRecords$default$5(), $this.produceRecords$default$6());
        ClusterLinkTestHarness qual$5 = (ClusterLinkTestHarness)secondaryCluster$1.elem;
        short x$23 = $this.replicationFactor();
        String x$24 = $this.linkName();
        Map<String, String> x$25 = qual$5.linkTopic$default$4();
        String x$26 = qual$5.linkTopic$default$5();
        qual$5.linkTopic(topic, x$23, x$24, x$25, x$26);
        $this.waitForMirroring((ClusterLinkTestHarness)secondaryCluster$1.elem, (Seq<TopicPartition>)$this.partitions$2(topic));
    }

    private final void createConsumerAndCommitOffsets$1(Consumer consumer, IndexedSeq topics$1) {
        consumer.subscribe((Collection)CollectionConverters$.MODULE$.SeqHasAsJava((Seq)topics$1).asJava());
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, 100, 15000L);
        consumer.commitSync(CollectionConverters$.MODULE$.MapHasAsJava((Map)((IterableOnceOps)((IndexedSeqOps)topics$1.flatMap((Function1 & Serializable)topic -> this.partitions$2((String)topic))).map((Function1 & Serializable)x$4 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$4), (Object)new OffsetAndMetadata(200L)))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava());
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$11(BidirectionalLinkIntegrationTest $this, ObjectRef secondaryCluster$1, String x$8) {
        ClusterLinkTestHarness qual$8 = (ClusterLinkTestHarness)secondaryCluster$1.elem;
        String x$37 = $this.linkName();
        String x$38 = qual$8.waitUntilMirrorDescriptionState$default$4();
        qual$8.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_STOPPED, x$8, x$37, x$38);
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$12(BidirectionalLinkIntegrationTest $this, ObjectRef secondaryCluster$1, String x$9) {
        ClusterLinkTestHarness qual$9 = (ClusterLinkTestHarness)secondaryCluster$1.elem;
        String x$41 = $this.linkName();
        String x$42 = qual$9.waitUntilMirrorDescriptionState$default$4();
        qual$9.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, x$9, x$41, x$42);
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$13(BidirectionalLinkIntegrationTest $this, ObjectRef secondaryCluster$1, String x$10) {
        ClusterLinkTestHarness qual$10 = (ClusterLinkTestHarness)secondaryCluster$1.elem;
        String x$45 = $this.linkName();
        String x$46 = qual$10.waitUntilMirrorDescriptionState$default$4();
        qual$10.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, x$10, x$45, x$46);
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$1(String westTopic$1, int i) {
        return new TopicPartition(westTopic$1, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$2(String eastTopic$1, int i) {
        return new TopicPartition(eastTopic$1, i);
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$3(MetricName metricName) {
        return metricName.group().contains("cluster-link") && metricName.tags().containsKey("mode");
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$4(MetricName x$11) {
        String string = x$11.name();
        String string2 = "active-link-count";
        return string == null || !string.equals(string2);
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$5(MetricName metricName) {
        Object v = metricName.tags().get("mode");
        String string = "bidirectional";
        return (v == null || !v.equals(string)) && metricName.tags().containsKey("link-id") && !metricName.name().startsWith("reverse-connection");
    }

    public static final /* synthetic */ void $anonfun$verifyLinkMetrics$1(BidirectionalLinkIntegrationTest $this, ClusterLinkTestHarness cluster) {
        $this.verifyLinkCountMetric(ClusterLinkConfig.LinkMode.BIDIRECTIONAL, "active", cluster);
        $this.verifyActiveLinkCountMetric(cluster, ClusterLinkConfig.LinkMode.BIDIRECTIONAL);
        $this.verifyActiveLinkCountMetric(cluster, ClusterLinkConfig.LinkMode.SOURCE);
        Set nonBidirectionalModeMetrics = ((IterableOnceOps)cluster.aliveServers().flatMap((Function1 & Serializable)server -> ((IterableOnceOps)((IterableOps)((IterableOps)((IterableOps)CollectionConverters$.MODULE$.SetHasAsScala(server.metrics().metrics().keySet()).asScala().filter((Function1 & Serializable)metricName -> BoxesRunTime.boxToBoolean((boolean)BidirectionalLinkIntegrationTest.$anonfun$verifyLinkMetrics$3(metricName)))).filter((Function1 & Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)BidirectionalLinkIntegrationTest.$anonfun$verifyLinkMetrics$4(x$11)))).filter((Function1 & Serializable)metricName -> BoxesRunTime.boxToBoolean((boolean)BidirectionalLinkIntegrationTest.$anonfun$verifyLinkMetrics$5(metricName)))).map((Function1 & Serializable)metricName -> new StringBuilder(1).append(metricName.name()).append(":").append(metricName.tags()).toString())).toSet())).toSet();
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().empty(), (Object)nonBidirectionalModeMetrics);
    }

    public BidirectionalLinkIntegrationTest() {
        this.useSourceInitiatedLink_$eq(false);
        this.sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 0, 2));
        this.destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 100, 2));
    }
}

