package kafka.link;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import kafka.server.ReplicaManager;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkDestClientManager;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.ClusterLinkRepairMirrors;
import kafka.server.link.ClusterLinkTopicState;
import kafka.server.link.InternalTaskErrorCode$;
import kafka.server.link.NoErrorCode$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkTaskError;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterLinkInUseException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.StringOps$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkDRIntegrationTest.scala */
@Tags({@Tag("integration"), @Tag("bazel:shard_count:5")})
@ScalaSignature(bytes = "\u0006\u0005\u0005ud\u0001\u0002\t\u0012\u0001YAQa\u0007\u0001\u0005\u0002qAQA\b\u0001\u0005\u0002}AQa\u0015\u0001\u0005\u0002QCQA\u0017\u0001\u0005\u0002mCQ!\u0019\u0001\u0005\u0002\tDQ\u0001\u001b\u0001\u0005\u0002%DQa\u001c\u0001\u0005\u0002ADQA\u001e\u0001\u0005\u0002]DQ! \u0001\u0005\u0002yDq!!\u0003\u0001\t\u0003\tY\u0001C\u0004\u0002\u0018\u0001!\t!!\u0007\t\u000f\u0005\u0015\u0002\u0001\"\u0001\u0002(!9\u00111\u0007\u0001\u0005\n\u0005U\u0002\"CA\u001e\u0001E\u0005I\u0011BA\u001f\u0011\u001d\t\u0019\u0006\u0001C\u0001\u0003+\u0012Ad\u00117vgR,'\u000fT5oW\u0012\u0013\u0016J\u001c;fOJ\fG/[8o)\u0016\u001cHO\u0003\u0002\u0013'\u0005!A.\u001b8l\u0015\u0005!\u0012!B6bM.\f7\u0001A\n\u0003\u0001]\u0001\"\u0001G\r\u000e\u0003EI!AG\t\u0003E\u0005\u00137\u000f\u001e:bGR\u001cE.^:uKJd\u0015N\\6J]R,wM]1uS>tG+Z:u\u0003\u0019a\u0014N\\5u}Q\tQ\u0004\u0005\u0002\u0019\u0001\u0005aB/Z:u!\u0006,8/\u001a)f]\u0012Lgn\u001a*fa\u0006L'/T5se>\u0014Hc\u0001\u0011'gA\u0011\u0011\u0005J\u0007\u0002E)\t1%A\u0003tG\u0006d\u0017-\u0003\u0002&E\t!QK\\5u\u0011\u00159#\u00011\u0001)\u0003\u0019\tXo\u001c:v[B\u0011\u0011\u0006\r\b\u0003U9\u0002\"a\u000b\u0012\u000e\u00031R!!L\u000b\u0002\rq\u0012xn\u001c;?\u0013\ty#%\u0001\u0004Qe\u0016$WMZ\u0005\u0003cI\u0012aa\u0015;sS:<'BA\u0018#\u0011\u0015!$\u00011\u00016\u0003-\u0019wn\u001c:eS:\fGo\u001c:\u0011\u0005\u00052\u0014BA\u001c#\u0005\u001d\u0011un\u001c7fC:DCAA\u001dF\rB\u0011!hQ\u0007\u0002w)\u0011A(P\u0001\u0007a\u0006\u0014\u0018-\\:\u000b\u0005yz\u0014a\u00026va&$XM\u001d\u0006\u0003\u0001\u0006\u000bQA[;oSRT\u0011AQ\u0001\u0004_J<\u0017B\u0001#<\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^\u0001\u0005]\u0006lW-I\u0001H\u0003!ZH-[:qY\u0006Lh*Y7f{:\nXo\u001c:v[vZ\b' \u0018d_>\u0014H-\u001b8bi>\u0014Xh_\u0019~Q\u0011\u0011\u0011j\u0014)\u0011\u0005)kU\"A&\u000b\u00051[\u0014\u0001\u00039s_ZLG-\u001a:\n\u00059[%\u0001D'fi\"|GmU8ve\u000e,\u0017!\u0002<bYV,G&A)\"\u0003I\u000bq\"\u00197m\u0007>l'-\u001b8bi&|gn]\u00010i\u0016\u001cHOU3wKJ\u001cX-\u00118e'^\f\u0007OR1jYN<\u0016\u000e\u001e5O_:\u0014\u0015\u000eZ5sK\u000e$\u0018n\u001c8bY2Kgn\u001b\u000b\u0004AU3\u0006\"B\u0014\u0004\u0001\u0004A\u0003\"\u0002\u001b\u0004\u0001\u0004)\u0004\u0006B\u0002:\u000b\u001aCCaA%P32\n\u0011+A\u0013uKN$8\u000b^8q\u001b&\u0014(o\u001c:U_BL7mV5uQ&sg/\u00197jIJ+\u0017/^3tiR\u0019\u0001\u0005X/\t\u000b\u001d\"\u0001\u0019\u0001\u0015\t\u000bQ\"\u0001\u0019A\u001b)\t\u0011ITI\u0012\u0015\u0005\t%{\u0005\rL\u0001R\u00039!Xm\u001d;Ti>\u0004X*\u001b:s_J$2\u0001I2e\u0011\u00159S\u00011\u0001)\u0011\u0015!T\u00011\u00016Q\u0011)\u0011(\u0012$)\t\u0015Iuj\u001a\u0017\u0002#\u00069C/Z:u'R|\u0007/T5se>\u0014x+\u001b;i'>,(oY3DYV\u001cH/\u001a:TQV$Hm\\<o)\r\u0001#n\u001b\u0005\u0006O\u0019\u0001\r\u0001\u000b\u0005\u0006i\u0019\u0001\r!\u000e\u0015\u0005\re*e\t\u000b\u0003\u0007\u0013>sG&A)\u0002EQ,7\u000f^*u_Bl\u0015N\u001d:peR\u000b7o[*uCR,G)\u001e:j]\u001e,%O]8s)\r\u0001\u0013O\u001d\u0005\u0006O\u001d\u0001\r\u0001\u000b\u0005\u0006i\u001d\u0001\r!\u000e\u0015\u0005\u000fe*e\t\u000b\u0003\b\u0013>+H&A)\u0002SQ,7\u000f^'jeJ|'OR1jY>4XM],iK:\u001cv.\u001e:dK&\u001bXK\\1wC&d\u0017M\u00197f)\r\u0001\u00030\u001f\u0005\u0006O!\u0001\r\u0001\u000b\u0005\u0006i!\u0001\r!\u000e\u0015\u0005\u0011e*e\t\u000b\u0003\t\u0013>cH&A)\u0002SQ,7\u000f\u001e*pY2\u0014\u0017mY6NSJ\u0014xN\u001d$s_6\u0004VM\u001c3j]\u001e\u001cFo\u001c9qK\u0012\u001cF/\u0019;f)\u0011\u0001s0!\u0001\t\u000b\u001dJ\u0001\u0019\u0001\u0015\t\u000bQJ\u0001\u0019A\u001b)\t%ITI\u0012\u0015\u0006\u0013%{\u0015q\u0001\u0017\u0002#\u0006QC/Z:u%>dGNY1dW6K'O]8s\r\u0006LGn](o+:\u001cX\u000f\u001d9peR,Gm\u0015;bi\u0016\u001cH#\u0002\u0011\u0002\u000e\u0005=\u0001\"B\u0014\u000b\u0001\u0004A\u0003\"\u0002\u001b\u000b\u0001\u0004)\u0004\u0006\u0002\u0006:\u000b\u001aCSAC%P\u0003+a\u0013!U\u00014i\u0016\u001cH\u000fT1ti\u001a+Go\u00195fI>3gm]3u!J|Wn\u001c;fI6K'O]8s)>\u0004\u0018n\u0019#fg\u000e\u0014\u0018\u000e\u001d;j_:$R\u0001IA\u000e\u0003;AQaJ\u0006A\u0002!BQ\u0001N\u0006A\u0002UBCaC\u001dF\r\"*1\"S(\u0002$1\n\u0011+A\u001buKN$H*Y:u\r\u0016$8\r[3e\u001f\u001a47/\u001a;GC&dW\rZ(wKJl\u0015N\u001d:peR{\u0007/[2EKN\u001c'/\u001b9uS>tG#\u0002\u0011\u0002*\u0005-\u0002\"B\u0014\r\u0001\u0004A\u0003\"\u0002\u001b\r\u0001\u0004)\u0004\u0006\u0002\u0007:\u000b\u001aCS\u0001D%P\u0003ca\u0013!U\u00013i\u0016\u001cH\u000fT1ti\u001a+Go\u00195fI>3gm]3u'R|\u0007\u000f]3e\u001b&\u0014(o\u001c:U_BL7\rR3tGJL\u0007\u000f^5p]R\u0019\u0001%a\u000e\t\u0011\u0005eR\u0002%AA\u0002U\nq\u0001\u001d:p[>$X-\u0001\u001fuKN$H*Y:u\r\u0016$8\r[3e\u001f\u001a47/\u001a;Ti>\u0004\b/\u001a3NSJ\u0014xN\u001d+pa&\u001cG)Z:de&\u0004H/[8oI\u0011,g-Y;mi\u0012\nTCAA U\r)\u0014\u0011I\u0016\u0003\u0003\u0007\u0002B!!\u0012\u0002P5\u0011\u0011q\t\u0006\u0005\u0003\u0013\nY%A\u0005v]\u000eDWmY6fI*\u0019\u0011Q\n\u0012\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002R\u0005\u001d#!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006YC/Z:u'R|\u0007\u000f]3e\u001b&\u0014(o\u001c:U_BL7m],ji\"\u001cuN\u001c;s_2dWM]\"iC:<W\rF\u0003!\u0003/\nI\u0006C\u0003(\u001f\u0001\u0007\u0001\u0006C\u00035\u001f\u0001\u0007Q\u0007\u000b\u0003\u0010s\u00153\u0005&B\bJ\u001f\u0006}CFAA1C\t\t\u0019'\u0001\b{W\u000e{WNY5oCRLwN\\:)\r\u0001\t9gTA:!\u0011\tI'a\u001c\u000e\u0005\u0005-$bAA7{\u0005\u0019\u0011\r]5\n\t\u0005E\u00141\u000e\u0002\u0004)\u0006<\u0017EAA;\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8)\r\u0001\t9gTA=C\t\tY(A\ncCj,GNO:iCJ$wlY8v]RTT\u0007")
/* loaded from: input_file:kafka/link/ClusterLinkDRIntegrationTest.class */
public class ClusterLinkDRIntegrationTest extends AbstractClusterLinkIntegrationTest {
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testPausePendingRepairMirror(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        KafkaProducer<byte[], byte[]> createProducer = sourceCluster2.createProducer(sourceCluster2.createProducer$default$1(), sourceCluster2.createProducer$default$2(), sourceCluster2.createProducer$default$3());
        produceRecords(createProducer, topic(), 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.MaxMessageSizeProp()), "1000")}))), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        byte[] randomBytes = TestUtils.randomBytes(1100);
        produceRecords(createProducer, topic(), 1, produceRecords$default$4(), new Some(randomBytes), produceRecords$default$6());
        ClusterLinkTestHarness destCluster2 = destCluster();
        waitForFailure(destCluster2.createConfluentAdminClient(destCluster2.createConfluentAdminClient$default$1()), FailureType$.MODULE$.RecordTooLarge(), waitForFailure$default$3());
        MirrorTopicDescription.State state = MirrorTopicDescription.State.FAILED;
        ClusterLinkTestHarness destCluster3 = destCluster();
        Assertions.assertEquals(state, destCluster3.describeMirrorTopic(topic(), destCluster3.describeMirrorTopic$default$2()).state());
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.FAILED, topic(), numPartitions());
        ClusterLinkRepairMirrors repairMirrorsTask = repairMirrorsTask(linkName(), createClusterLink);
        repairMirrorsTask.shutdown();
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.MaxMessageSizeProp()), "10000")})));
        destCluster().alterMirrors(topic(), AlterMirrorOp.REPAIR);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PENDING_REPAIR, topic(), numPartitions());
        MirrorTopicDescription.State state2 = MirrorTopicDescription.State.PENDING_REPAIR;
        ClusterLinkTestHarness destCluster4 = destCluster();
        Assertions.assertEquals(state2, destCluster4.describeMirrorTopic(topic(), destCluster4.describeMirrorTopic$default$2()).state());
        destCluster().alterMirrors(topic(), AlterMirrorOp.PAUSE);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, topic(), numPartitions());
        MirrorTopicDescription.State state3 = MirrorTopicDescription.State.PAUSED;
        ClusterLinkTestHarness destCluster5 = destCluster();
        Assertions.assertEquals(state3, destCluster5.describeMirrorTopic(topic(), destCluster5.describeMirrorTopic$default$2()).state());
        destCluster().alterMirrors(topic(), AlterMirrorOp.RESUME);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PENDING_REPAIR, topic(), numPartitions());
        MirrorTopicDescription.State state4 = MirrorTopicDescription.State.PENDING_REPAIR;
        ClusterLinkTestHarness destCluster6 = destCluster();
        Assertions.assertEquals(state4, destCluster6.describeMirrorTopic(topic(), destCluster6.describeMirrorTopic$default$2()).state());
        TestUtils.setFieldValueInSuperClassOfSuperClass(repairMirrorsTask, "isShuttingDown", BoxesRunTime.boxToBoolean(false));
        repairMirrorsTask.startup();
        ClusterLinkDestClientManager clusterLinkDestClientManager = (ClusterLinkDestClientManager) destCluster().linkCoordinator(linkName()).clusterLinkManager().destClientManager(createClusterLink).get();
        clusterLinkDestClientManager.taskManager().clusterLinkRepairMirrors().maybeAddTask(topic(), (ClusterLinkTopicState) clusterLinkDestClientManager.metadataManager().mirrorTopicStatesFromMetadataCache((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{topic()}))).apply(topic()), None$.MODULE$, clusterLinkDestClientManager);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, topic(), numPartitions());
        MirrorTopicDescription.State state5 = MirrorTopicDescription.State.ACTIVE;
        ClusterLinkTestHarness destCluster7 = destCluster();
        Assertions.assertEquals(state5, destCluster7.describeMirrorTopic(topic(), destCluster7.describeMirrorTopic$default$2()).state());
        produceRecords(createProducer, topic(), 1, produceRecords$default$4(), new Some(randomBytes), produceRecords$default$6());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        destCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness destCluster8 = destCluster();
        destCluster8.deleteClusterLink(linkName(), destCluster8.deleteClusterLink$default$2(), destCluster8.deleteClusterLink$default$3());
        createProducer.close(Duration.ZERO);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testReverseAndSwapFailsWithNonBidirectionalLink(String str, boolean z) {
        useBidirectionalLink_$eq(false);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().alterMirrors(this.topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        });
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testStopMirrorTopicWithInvalidRequest(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> {
            ClusterLinkTestHarness destCluster = this.destCluster();
            destCluster.unlinkTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), this.linkName(), false, false, destCluster.unlinkTopic$default$5(), destCluster.unlinkTopic$default$6());
        });
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.createTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            ClusterLinkTestHarness destCluster2 = this.destCluster();
            destCluster2.unlinkTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), this.linkName(), false, false, destCluster2.unlinkTopic$default$5(), destCluster2.unlinkTopic$default$6());
        });
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), clusterLinkPrefix());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), numPartitions());
        verifyTimeToStopMirrorTopicPromoteMetric(linkName());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            ClusterLinkTestHarness destCluster4 = this.destCluster();
            destCluster4.unlinkTopic(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), this.linkName(), false, false, destCluster4.unlinkTopic$default$5(), destCluster4.unlinkTopic$default$6());
        });
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testStopMirror(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), false, destCluster2.unlinkTopic$default$5(), destCluster2.unlinkTopic$default$6());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        verifyTimeToStopMirrorTopicFailoverMetric(linkName());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_stopped", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster().linkCoordinator(linkName()), Nil$.MODULE$), destCluster().nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.linkTopic(topic(), replicationFactor(), linkName(), destCluster3.linkTopic$default$4(), clusterLinkPrefix());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster4.unlinkTopic$default$3(), destCluster4.unlinkTopic$default$4(), destCluster4.unlinkTopic$default$5(), destCluster4.unlinkTopic$default$6());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        verifyTimeToStopMirrorTopicPromoteMetric(linkName());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_stopped", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster().linkCoordinator(linkName()), Nil$.MODULE$), destCluster().nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.deleteClusterLink(linkName(), destCluster5.deleteClusterLink$default$2(), destCluster5.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testStopMirrorWithSourceClusterShutdown(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Uuid uuid = sourceCluster().describeTopic(topic()).topicId();
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("request.timeout.ms", "1000");
        destLinkProps.setProperty("default.api.timeout.ms", "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "2");
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        sourceCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(topic(), linkName(), destCluster2.unlinkTopic$default$3(), false, destCluster2.unlinkTopic$default$5(), destCluster2.unlinkTopic$default$6());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        verifyTimeToStopMirrorTopicFailoverMetric(linkName());
        restartSource(restartSource$default$1());
        restartMirrorTopic$1();
        sourceCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), false, destCluster3.unlinkTopic$default$6());
        Thread.sleep(1000L);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PENDING_STOPPED, topic(), numPartitions());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        Assertions.assertThrows(ClusterLinkInUseException.class, () -> {
            ClusterLinkTestHarness destCluster4 = this.destCluster();
            destCluster4.deleteClusterLink(this.linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
        });
        restartSource(restartSource$default$1());
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, topic(), numPartitions());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        verifyTimeToStopMirrorTopicPromoteMetric(linkName());
        restartSource(restartSource$default$1());
        restartMirrorTopic$1();
        sourceCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.unlinkTopic(topic(), linkName(), destCluster4.unlinkTopic$default$3(), destCluster4.unlinkTopic$default$4(), false, destCluster4.unlinkTopic$default$6());
        Thread.sleep(1000L);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PENDING_STOPPED, topic(), numPartitions());
        ClusterLinkTestHarness destCluster5 = destCluster();
        Assertions.assertEquals(uuid, destCluster5.describeMirrorTopic(topic(), destCluster5.describeMirrorTopic$default$2()).sourceTopicId());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        ClusterLinkTestHarness destCluster6 = destCluster();
        destCluster6.unlinkTopic(topic(), linkName(), destCluster6.unlinkTopic$default$3(), false, destCluster6.unlinkTopic$default$5(), destCluster6.unlinkTopic$default$6());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        verifyTimeToStopMirrorTopicFailoverMetric(linkName());
        destCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness destCluster7 = destCluster();
        destCluster7.deleteClusterLink(linkName(), destCluster7.deleteClusterLink$default$2(), destCluster7.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testStopMirrorTaskStateDuringError(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("request.timeout.ms", "1000");
        destLinkProps.setProperty("default.api.timeout.ms", "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "2");
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        restartSource(restartSource$default$1());
        restartMirrorTopic$2();
        sourceCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(topic(), linkName(), destCluster2.unlinkTopic$default$3(), destCluster2.unlinkTopic$default$4(), false, destCluster2.unlinkTopic$default$6());
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PENDING_STOPPED, topic(), numPartitions());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_stopped", new $colon.colon(InternalTaskErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster().linkCoordinator(linkName()), Nil$.MODULE$), destCluster().nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testStopMirrorTaskStateDuringError$1(this);
                restartSource(restartSource$default$1());
                destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, topic(), numPartitions());
                waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
                TestUtils$ testUtils$2 = TestUtils$.MODULE$;
                long j2 = 1;
                long currentTimeMillis2 = System.currentTimeMillis();
                while (true) {
                    try {
                        $anonfun$testStopMirrorTaskStateDuringError$2(this);
                        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_stopped", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster().linkCoordinator(linkName()), Nil$.MODULE$), destCluster().nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
                        destCluster().deleteTopic(topic(), true);
                        ClusterLinkTestHarness destCluster3 = destCluster();
                        destCluster3.deleteClusterLink(linkName(), destCluster3.deleteClusterLink$default$2(), destCluster3.deleteClusterLink$default$3());
                        return;
                    } catch (AssertionError e) {
                        if (System.currentTimeMillis() - currentTimeMillis2 > 15000) {
                            throw e;
                        }
                        if (testUtils$2.logger().underlying().isInfoEnabled()) {
                            testUtils$2.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$2, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j2).append(", and then retrying.").toString()));
                        }
                        Thread.sleep(j2);
                        j2 += package$.MODULE$.min(j2, 1000L);
                    }
                }
            } catch (AssertionError e2) {
                if (System.currentTimeMillis() - currentTimeMillis > 15000) {
                    throw e2;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorFailoverWhenSourceIsUnavailable(String str, boolean z) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        destLinkProps.setProperty("request.timeout.ms", "1000");
        destLinkProps.setProperty("default.api.timeout.ms", "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "2");
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        int i = destCluster().isKRaftTest() ? 0 : 1;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMirrorFailoverWhenSourceIsUnavailable$1(this, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Destination leader epoch not updated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), numPartitions());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        updateCredentials(useSourceInitiatedLink() ? destCluster() : sourceCluster());
        waitUntilOneOfMirrorStates((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new ReplicaStatus.MirrorInfo.State[]{ReplicaStatus.MirrorInfo.State.SOURCE_AUTHENTICATION_FAILED, ReplicaStatus.MirrorInfo.State.SOURCE_UNAVAILABLE})));
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        verifyDescribeLinksResult(ClusterLinkDescription.LinkState.UNAVAILABLE, ClusterLinkDescription.LinkState.ACTIVE);
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), false, destCluster2.unlinkTopic$default$5(), numPartitions());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testRollbackMirrorFromPendingStoppedState(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        sourceCluster().killAllBrokers();
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), false, destCluster2.unlinkTopic$default$4(), false, destCluster2.unlinkTopic$default$6());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_STOPPED);
        destCluster().alterMirrors(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), AlterMirrorOp.ROLLBACK);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        restartSource(restartSource$default$1());
        produceToSourceCluster(100);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testRollbackMirrorFailsOnUnsupportedStates(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp(), "1000");
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().alterMirrors(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), AlterMirrorOp.ROLLBACK);
        });
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.pauseTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), destCluster2.pauseTopic$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED);
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().alterMirrors(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), AlterMirrorOp.ROLLBACK);
        });
        destCluster().pauseTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        sourceCluster().deleteTopic(topic(), true);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED);
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().alterMirrors(new StringBuilder(0).append(this.clusterLinkPrefix()).append(this.topic()).toString(), AlterMirrorOp.ROLLBACK);
        });
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testLastFetchedOffsetPromotedMirrorTopicDescription(String str, boolean z) {
        testLastFetchedOffsetStoppedMirrorTopicDescription(true);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testLastFetchedOffsetFailedOverMirrorTopicDescription(String str, boolean z) {
        testLastFetchedOffsetStoppedMirrorTopicDescription(false);
    }

    private void testLastFetchedOffsetStoppedMirrorTopicDescription(boolean z) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        int i = destCluster().isKRaftTest() ? 0 : 1;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$1(this, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Destination leader epoch not updated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness destCluster2 = destCluster();
        ReplicaStatus replicaStatus = (ReplicaStatus) ((IterableOps) destCluster2.replicaStatus(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, destCluster2.replicaStatus$default$3()).filter(replicaStatus2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$3(replicaStatus2));
        })).head();
        Assertions.assertTrue(replicaStatus.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo = (ReplicaStatus.MirrorInfo) replicaStatus.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, mirrorInfo.state());
        Assertions.assertEquals(10, mirrorInfo.lastFetchSourceHighWatermark());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), z, destCluster3.unlinkTopic$default$5(), numPartitions());
        ClusterLinkTestHarness destCluster4 = destCluster();
        ReplicaStatus replicaStatus3 = (ReplicaStatus) ((IterableOps) destCluster4.replicaStatus(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, destCluster4.replicaStatus$default$3()).filter(replicaStatus4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$4(replicaStatus4));
        })).head();
        Assertions.assertTrue(replicaStatus3.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo2 = (ReplicaStatus.MirrorInfo) replicaStatus3.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.STOPPED, mirrorInfo2.state());
        Assertions.assertEquals(-1L, mirrorInfo2.lastFetchSourceHighWatermark());
        ClusterLinkTestHarness destCluster5 = destCluster();
        MirrorTopicDescription describeMirrorTopic = destCluster5.describeMirrorTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), destCluster5.describeMirrorTopic$default$2());
        Assertions.assertEquals(describeMirrorTopic.state(), MirrorTopicDescription.State.STOPPED);
        Assertions.assertEquals(1, describeMirrorTopic.stoppedLogEndOffsets().size());
        Assertions.assertEquals(10, Predef$.MODULE$.Long2long((Long) describeMirrorTopic.stoppedLogEndOffsets().get(0)));
    }

    private boolean testLastFetchedOffsetStoppedMirrorTopicDescription$default$1() {
        return true;
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testStoppedMirrorTopicsWithControllerChange(String str, boolean z) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, topic(), numPartitions());
        MirrorTopicDescription.State state = MirrorTopicDescription.State.ACTIVE;
        ClusterLinkTestHarness destCluster2 = destCluster();
        Assertions.assertEquals(state, destCluster2.describeMirrorTopic(topic(), destCluster2.describeMirrorTopic$default$2()).state());
        int controllerId = destCluster().controllerId();
        destCluster().changeController();
        MirrorTopicDescription.State state2 = MirrorTopicDescription.State.ACTIVE;
        ClusterLinkTestHarness destCluster3 = destCluster();
        Assertions.assertEquals(state2, destCluster3.describeMirrorTopic(topic(), destCluster3.describeMirrorTopic$default$2()).state());
        destCluster().unlinkTopic(topic(), linkName(), true, false, true, numPartitions());
        MirrorTopicDescription.State state3 = MirrorTopicDescription.State.STOPPED;
        ClusterLinkTestHarness destCluster4 = destCluster();
        Assertions.assertEquals(state3, destCluster4.describeMirrorTopic(topic(), destCluster4.describeMirrorTopic$default$2()).state());
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.deleteClusterLink(linkName(), destCluster5.deleteClusterLink$default$2(), destCluster5.deleteClusterLink$default$3());
        ClusterLinkTestHarness destCluster6 = destCluster();
        KafkaProducer<byte[], byte[]> createProducer = destCluster6.createProducer(destCluster6.createProducer$default$1(), destCluster6.createProducer$default$2(), destCluster6.createProducer$default$3());
        produceRecords(createProducer, topic(), 1, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        destCluster().changeControllerToSpecificBroker(controllerId);
        TopicPartition topicPartition = (TopicPartition) partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).head();
        Buffer buffer = (Buffer) ((IterableOps) ((Buffer) ((IterableOps) destCluster().brokers().map(kafkaBroker -> {
            return kafkaBroker.replicaManager();
        })).filter(replicaManager -> {
            return BoxesRunTime.boxToBoolean($anonfun$testStoppedMirrorTopicsWithControllerChange$2(topicPartition, replicaManager));
        })).filter(replicaManager2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testStoppedMirrorTopicsWithControllerChange$3(topicPartition, replicaManager2));
        })).map(replicaManager3 -> {
            return BoxesRunTime.boxToInteger($anonfun$testStoppedMirrorTopicsWithControllerChange$4(replicaManager3));
        });
        Assertions.assertNotSame(BoxesRunTime.boxToInteger(0), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(buffer.foldLeft(BoxesRunTime.boxToInteger(0), (i, i2) -> {
            int i = 0;
            if (i2 != controllerId) {
                this.destCluster().shutdownBroker(i2);
                i = 1;
            }
            return i + i;
        }))), new StringBuilder(56).append("No broker is shutdown with replicaIds ").append(buffer).append(" and controllerId ").append(controllerId).toString());
        produceRecords(createProducer, topic(), 1, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        createProducer.close();
    }

    private final void restartMirrorTopic$1() {
        destCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
    }

    private final void restartMirrorTopic$2() {
        destCluster().deleteTopic(topic(), true);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
    }

    public static final /* synthetic */ void $anonfun$testStopMirrorTaskStateDuringError$1(ClusterLinkDRIntegrationTest clusterLinkDRIntegrationTest) {
        MirrorTopicDescription describeMirrorTopic = clusterLinkDRIntegrationTest.destCluster().describeMirrorTopic(clusterLinkDRIntegrationTest.topic(), true);
        Assertions.assertNotNull(describeMirrorTopic);
        List mirrorStateTransitionErrors = describeMirrorTopic.mirrorStateTransitionErrors();
        Assertions.assertEquals(1, mirrorStateTransitionErrors.size());
        Assertions.assertEquals(new ClusterLinkTaskError(ClusterLinkTaskError.ClusterLinkTaskErrorCode.INTERNAL_ERROR, "Failed to describe topic configs for an unknown reason."), mirrorStateTransitionErrors.get(0));
        ClusterLinkTestHarness destCluster = clusterLinkDRIntegrationTest.destCluster();
        MirrorTopicDescription describeMirrorTopic2 = destCluster.describeMirrorTopic(clusterLinkDRIntegrationTest.topic(), destCluster.describeMirrorTopic$default$2());
        Assertions.assertNotNull(describeMirrorTopic2);
        Assertions.assertEquals(0, describeMirrorTopic2.mirrorStateTransitionErrors().size());
    }

    public static final /* synthetic */ void $anonfun$testStopMirrorTaskStateDuringError$2(ClusterLinkDRIntegrationTest clusterLinkDRIntegrationTest) {
        MirrorTopicDescription describeMirrorTopic = clusterLinkDRIntegrationTest.destCluster().describeMirrorTopic(clusterLinkDRIntegrationTest.topic(), true);
        Assertions.assertNotNull(describeMirrorTopic);
        Assertions.assertEquals(0, describeMirrorTopic.mirrorStateTransitionErrors().size());
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorFailoverWhenSourceIsUnavailable$1(ClusterLinkDRIntegrationTest clusterLinkDRIntegrationTest, int i) {
        return clusterLinkDRIntegrationTest.destCluster().leaderEpoch(new TopicPartition(clusterLinkDRIntegrationTest.topic(), 0)) >= i;
    }

    public static final /* synthetic */ String $anonfun$testMirrorFailoverWhenSourceIsUnavailable$2() {
        return "Destination leader epoch not updated";
    }

    public static final /* synthetic */ boolean $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$1(ClusterLinkDRIntegrationTest clusterLinkDRIntegrationTest, int i) {
        return clusterLinkDRIntegrationTest.destCluster().leaderEpoch(new TopicPartition(new StringBuilder(0).append(clusterLinkDRIntegrationTest.clusterLinkPrefix()).append(clusterLinkDRIntegrationTest.topic()).toString(), 0)) >= i;
    }

    public static final /* synthetic */ String $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$2() {
        return "Destination leader epoch not updated";
    }

    public static final /* synthetic */ boolean $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$3(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$4(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testStoppedMirrorTopicsWithControllerChange$2(TopicPartition topicPartition, ReplicaManager replicaManager) {
        return replicaManager.getLog(topicPartition).nonEmpty();
    }

    public static final /* synthetic */ boolean $anonfun$testStoppedMirrorTopicsWithControllerChange$3(TopicPartition topicPartition, ReplicaManager replicaManager) {
        return replicaManager.onlinePartition(topicPartition).nonEmpty();
    }

    public static final /* synthetic */ int $anonfun$testStoppedMirrorTopicsWithControllerChange$4(ReplicaManager replicaManager) {
        return replicaManager.config().brokerId();
    }
}
