/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.server.ClusterLinkRequestQuota;
import kafka.server.UnboundedClusterLinkRequestQuota$;
import kafka.server.link.AuthorizationTaskErrorCode$;
import kafka.server.link.ClusterLinkAlterConfigPolicy;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkDestClientManager;
import kafka.server.link.ClusterLinkLocalAdmin;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkSyncTopicsConfigs;
import kafka.server.link.ConnectionMode;
import kafka.server.link.InternalTaskErrorCode$;
import kafka.server.link.MirrorTopicConfigSyncRules;
import kafka.server.link.PolicyViolationTaskErrorCode$;
import kafka.server.link.TaskDescription;
import kafka.server.link.TaskErrorCode;
import kafka.server.link.TaskErrorCodeAndMsg;
import kafka.server.link.UnknownTopicOrPartitionErrorCode$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.util.MockTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Map;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005\t\u0005d\u0001B\u0017/\u0001UBQ\u0001\u0010\u0001\u0005\u0002uBq\u0001\u0011\u0001C\u0002\u0013%\u0011\t\u0003\u0004F\u0001\u0001\u0006IA\u0011\u0005\b\r\u0002\u0011\r\u0011\"\u0003H\u0011\u0019)\u0006\u0001)A\u0005\u0011\"9a\u000b\u0001b\u0001\n\u00139\u0006BB.\u0001A\u0003%\u0001\fC\u0004]\u0001\t\u0007I\u0011B/\t\r\u0005\u0004\u0001\u0015!\u0003_\u0011\u001d\u0011\u0007A1A\u0005\n\rDaa\u001a\u0001!\u0002\u0013!\u0007b\u00025\u0001\u0005\u0004%I!\u001b\u0005\u0007[\u0002\u0001\u000b\u0011\u00026\t\u000f9\u0004!\u0019!C\u0005_\"11\u000f\u0001Q\u0001\nADq\u0001\u001e\u0001C\u0002\u0013%Q\u000f\u0003\u0004z\u0001\u0001\u0006IA\u001e\u0005\bu\u0002\u0011\r\u0011\"\u0003|\u0011\u001d\tI\u0001\u0001Q\u0001\nqD\u0011\"a\u0003\u0001\u0005\u0004%I!!\u0004\t\u0011\u0005]\u0001\u0001)A\u0005\u0003\u001fAq!!\u0007\u0001\t\u0003\tY\u0002C\u0004\u0002:\u0001!\t!a\u0007\t\u000f\u0005\r\u0003\u0001\"\u0001\u0002\u001c!9\u0011Q\n\u0001\u0005\u0002\u0005m\u0001bBA)\u0001\u0011\u0005\u00111\u0004\u0005\b\u0003+\u0002A\u0011AA\u000e\u0011\u001d\tI\u0006\u0001C\u0001\u00037Aq!!\u0018\u0001\t\u0003\tY\u0002C\u0004\u0002b\u0001!\t!a\u0007\t\u000f\u0005\u0015\u0004\u0001\"\u0001\u0002\u001c!9\u0011\u0011\u000e\u0001\u0005\u0002\u0005m\u0001bBA7\u0001\u0011\u0005\u00111\u0004\u0005\b\u0003c\u0002A\u0011AA\u000e\u0011\u001d\t)\b\u0001C\u0001\u00037Aq!!\u001f\u0001\t\u0003\tY\u0002C\u0004\u0002~\u0001!I!a \t\u000f\u0005\u001d\u0006\u0001\"\u0003\u0002*\"9\u0011Q\u001b\u0001\u0005\n\u0005]\u0007\"\u0003B\u0003\u0001E\u0005I\u0011\u0002B\u0004\u0011\u001d\u0011i\u0002\u0001C\u0005\u0005?AqAa\u000e\u0001\t\u0013\u0011I\u0004C\u0005\u0003V\u0001\t\n\u0011\"\u0003\u0003X!I!1\f\u0001\u0012\u0002\u0013%!Q\f\u0002!\u00072,8\u000f^3s\u0019&t7nU=oGR{\u0007/[2t\u0007>tg-[4t)\u0016\u001cHO\u0003\u00020a\u0005!A.\u001b8l\u0015\t\t$'\u0001\u0004tKJ4XM\u001d\u0006\u0002g\u0005)1.\u00194lC\u000e\u00011C\u0001\u00017!\t9$(D\u00019\u0015\u0005I\u0014!B:dC2\f\u0017BA\u001e9\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012A\u0010\t\u0003\u007f\u0001i\u0011AL\u0001\ng\u000eDW\rZ;mKJ,\u0012A\u0011\t\u0003\u007f\rK!\u0001\u0012\u0018\u0003)\rcWo\u001d;fe2Kgn[*dQ\u0016$W\u000f\\3s\u0003)\u00198\r[3ek2,'\u000fI\u0001\fg>,(oY3BI6Lg.F\u0001I!\tI5+D\u0001K\u0015\tYE*A\u0003bI6LgN\u0003\u0002N\u001d\u000691\r\\5f]R\u001c(BA\u001aP\u0015\t\u0001\u0016+\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002%\u0006\u0019qN]4\n\u0005QS%AD\"p]\u001adW/\u001a8u\u0003\u0012l\u0017N\\\u0001\rg>,(oY3BI6Lg\u000eI\u0001\u001cY>\u001c\u0017\r\\\"mkN$XM\u001d'j].\fE-\\5o\u00072LWM\u001c;\u0016\u0003a\u0003\"aP-\n\u0005is#!F\"mkN$XM\u001d'j].dunY1m\u0003\u0012l\u0017N\\\u0001\u001dY>\u001c\u0017\r\\\"mkN$XM\u001d'j].\fE-\\5o\u00072LWM\u001c;!\u0003=iW\r^1eCR\fW*\u00198bO\u0016\u0014X#\u00010\u0011\u0005}z\u0016B\u00011/\u0005i\u0019E.^:uKJd\u0015N\\6NKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s\u0003AiW\r^1eCR\fW*\u00198bO\u0016\u0014\b%A\u0007dY&,g\u000e^'b]\u0006<WM]\u000b\u0002IB\u0011q(Z\u0005\u0003M:\u0012Ad\u00117vgR,'\u000fT5oW\u0012+7\u000f^\"mS\u0016tG/T1oC\u001e,'/\u0001\bdY&,g\u000e^'b]\u0006<WM\u001d\u0011\u0002\u000f5,GO]5dgV\t!\u000e\u0005\u0002@W&\u0011AN\f\u0002\u0013\u00072,8\u000f^3s\u0019&t7.T3ue&\u001c7/\u0001\u0005nKR\u0014\u0018nY:!\u0003)a\u0017N\\6D_:4\u0017nZ\u000b\u0002aB\u0011q(]\u0005\u0003e:\u0012\u0011c\u00117vgR,'\u000fT5oW\u000e{gNZ5h\u0003-a\u0017N\\6D_:4\u0017n\u001a\u0011\u0002)Q|\u0007/[2D_:4\u0017nZ*z]\u000e\u0014V\u000f\\3t+\u00051\bCA x\u0013\tAhF\u0001\u000eNSJ\u0014xN\u001d+pa&\u001c7i\u001c8gS\u001e\u001c\u0016P\\2Sk2,7/A\u000bu_BL7mQ8oM&<7+\u001f8d%VdWm\u001d\u0011\u0002\tQLW.Z\u000b\u0002yB\u0019Q0!\u0002\u000e\u0003yT1a`A\u0001\u0003\u0015)H/\u001b7t\u0015\r\t\u0019AT\u0001\u0007G>lWn\u001c8\n\u0007\u0005\u001daP\u0001\u0003US6,\u0017!\u0002;j[\u0016\u0004\u0013!B9v_R\fWCAA\b!\u0011\t\t\"a\u0005\u000e\u0003AJ1!!\u00061\u0005]\u0019E.^:uKJd\u0015N\\6SKF,Xm\u001d;Rk>$\u0018-\u0001\u0004rk>$\u0018\rI\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0003\u0003;\u00012aNA\u0010\u0013\r\t\t\u0003\u000f\u0002\u0005+:LG\u000fK\u0002\u0017\u0003K\u0001B!a\n\u000265\u0011\u0011\u0011\u0006\u0006\u0005\u0003W\ti#A\u0002ba&TA!a\f\u00022\u00059!.\u001e9ji\u0016\u0014(bAA\u001a#\u0006)!.\u001e8ji&!\u0011qGA\u0015\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\ti\u0016\f'\u000fR8x]\"\u001aq#!\u0010\u0011\t\u0005\u001d\u0012qH\u0005\u0005\u0003\u0003\nICA\u0005BMR,'/R1dQ\u0006\tB/Z:u+B$\u0017\r^3D_:4\u0017nZ:)\u0007a\t9\u0005\u0005\u0003\u0002(\u0005%\u0013\u0002BA&\u0003S\u0011A\u0001V3ti\u0006\u0019B/Z:u\u001d>,\u0006\u000fZ1uK\u000e{gNZ5hg\"\u001a\u0011$a\u0012\u00029Q,7\u000f^#yG\u0016\u0004H/[8o\r\u0016$8\r[5oO\u000e{gNZ5hg\"\u001a!$a\u0012\u0002gQ,7\u000f^!vi\"|'/\u001b>bi&|g.\u0012=dKB$\u0018n\u001c8Ge>lG)Z:de&\u0014WmQ8oM&<7OR;ukJ,\u0007fA\u000e\u0002H\u0005iD/Z:u+:\\gn\\<o)>\u0004\u0018nY(s!\u0006\u0014H/\u001b;j_:,\u0005pY3qi&|gN\u0012:p[\u0012+7o\u0019:jE\u0016\u001cuN\u001c4jON4U\u000f^;sK\"\u001aA$a\u0012\u0002aQ,7\u000f^!vi\"|'/\u001b>bi&|g.\u0012=dKB$\u0018n\u001c8Ge>l\u0017\t\u001c;fe\u000e{gNZ5hg\u001a+H/\u001e:fQ\ri\u0012qI\u0001\u001di\u0016\u001cH/\u0012=dKB$\u0018n\u001c8BYR,'/\u001b8h\u0007>tg-[4tQ\rq\u0012qI\u0001\u001bi\u0016\u001cH/\u0012=dKB$\u0018n\u001c8D_:4\u0017nZ:SKN,H\u000e\u001e\u0015\u0004?\u0005\u001d\u0013a\b;fgR,\u0005pY3qi&|g.\u00117uKJ\u001cuN\u001c4jON\u0014Vm];mi\"\u001a\u0001%a\u0012\u0002!Q,7\u000f^\"iC:<W\rV8qS\u000e\u001c\bfA\u0011\u0002H\u0005\u0001C/Z:u\u00072,8\u000f^3s\u0019&t7.\u00117uKJ\u001cuN\u001c4jOB{G.[2zQ\r\u0011\u0013qI\u0001\u001fi\u0016\u001cH/\u00117uKJ\u001cuN\u001c4jOB{G.[2z-&|G.\u0019;j_:D3aIA$\u0003A!Xm\u001d;Fq\u000e,7o]5wK2{w\rK\u0002%\u0003\u000f\n\u0011C\\3x\u0007>tg-[4SKN|WO]2f)\u0011\t\t)!$\u0011\t\u0005\r\u0015\u0011R\u0007\u0003\u0003\u000bSA!a\"\u0002\u0002\u000511m\u001c8gS\u001eLA!a#\u0002\u0006\nq1i\u001c8gS\u001e\u0014Vm]8ve\u000e,\u0007bBAHK\u0001\u0007\u0011\u0011S\u0001\u0006i>\u0004\u0018n\u0019\t\u0005\u0003'\u000b\tK\u0004\u0003\u0002\u0016\u0006u\u0005cAALq5\u0011\u0011\u0011\u0014\u0006\u0004\u00037#\u0014A\u0002\u001fs_>$h(C\u0002\u0002 b\na\u0001\u0015:fI\u00164\u0017\u0002BAR\u0003K\u0013aa\u0015;sS:<'bAAPq\u0005)\u0012\r\u001c;fe\u000e{gNZ5h%\u0016\fX/Z:u\u001b\u0006\u0004H\u0003BAV\u0003\u000f\u0004\u0002\"!,\u00028\u0006\u0005\u00151X\u0007\u0003\u0003_SA!!-\u00024\u0006!Q\u000f^5m\u0015\t\t),\u0001\u0003kCZ\f\u0017\u0002BA]\u0003_\u00131!T1q!\u0019\ti+!0\u0002B&!\u0011qXAX\u0005)\u0019u\u000e\u001c7fGRLwN\u001c\t\u0004\u0013\u0006\r\u0017bAAc\u0015\ni\u0011\t\u001c;fe\u000e{gNZ5h\u001fBDq!!3'\u0001\u0004\tY-A\u0005d_:4\u0017nZ'baBA\u00111SAg\u0003#\u000by-\u0003\u0003\u0002:\u0006\u0015\u0006\u0003BAW\u0003#LA!a5\u00020\nQ\u0001K]8qKJ$\u0018.Z:\u0002-5|7m[!mi\u0016\u00148i\u001c8gS\u001e\u001c(+Z:vYR$b!!7\u0002`\u0006U\bcA%\u0002\\&\u0019\u0011Q\u001c&\u0003%\u0005cG/\u001a:D_:4\u0017nZ:SKN,H\u000e\u001e\u0005\b\u0003C<\u0003\u0019AAr\u0003\u0019!x\u000e]5dgB1\u0011Q]Ax\u0003#sA!a:\u0002l:!\u0011qSAu\u0013\u0005I\u0014bAAwq\u00059\u0001/Y2lC\u001e,\u0017\u0002BAy\u0003g\u0014A\u0001T5ti*\u0019\u0011Q\u001e\u001d\t\u0013\u0005]x\u0005%AA\u0002\u0005e\u0018!C3yG\u0016\u0004H/[8o!\u00159\u00141`A\u0000\u0013\r\ti\u0010\u000f\u0002\u0007\u001fB$\u0018n\u001c8\u0011\t\u0005\u0015(\u0011A\u0005\u0005\u0005\u0007\t\u0019PA\u0005UQJ|w/\u00192mK\u0006\u0001Sn\\2l\u00032$XM]\"p]\u001aLwm\u001d*fgVdG\u000f\n3fM\u0006,H\u000e\u001e\u00133+\t\u0011IA\u000b\u0003\u0002z\n-1F\u0001B\u0007!\u0011\u0011yA!\u0007\u000e\u0005\tE!\u0002\u0002B\n\u0005+\t\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\t]\u0001(\u0001\u0006b]:|G/\u0019;j_:LAAa\u0007\u0003\u0012\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002?9,wo\u00117vgR,'\u000fT5oWNKhn\u0019+pa&\u001c7oQ8oM&<7\u000f\u0006\u0003\u0003\"\t\u001d\u0002cA \u0003$%\u0019!Q\u0005\u0018\u00039\rcWo\u001d;fe2Kgn[*z]\u000e$v\u000e]5dg\u000e{gNZ5hg\"9!\u0011F\u0015A\u0002\t-\u0012\u0001\u00037j].$\u0015\r^1\u0011\t\t5\"1G\u0007\u0003\u0005_Q1A!\r3\u0003\tQ8.\u0003\u0003\u00036\t=\"aD\"mkN$XM\u001d'j].$\u0015\r^1\u0002\u0013M,G/\u001e9N_\u000e\\GC\u0002B\u0016\u0005w\u0011y\u0005C\u0005\u0003>)\u0002\n\u00111\u0001\u0003@\u0005\t\u0012\r\u001c;fe\u000e{gNZ5h!>d\u0017nY=\u0011\u000b]\nYP!\u0011\u0011\t\t\r#1J\u0007\u0003\u0005\u000bRAAa\u0012\u0003J\u00051\u0001o\u001c7jGfT!!\r(\n\t\t5#Q\t\u0002\u0012\u00032$XM]\"p]\u001aLw\rU8mS\u000eL\b\"\u0003B)UA\u0005\t\u0019\u0001B*\u00031!XM\\1oiB\u0013XMZ5y!\u00159\u00141`AI\u0003M\u0019X\r^;q\u001b>\u001c7\u000e\n3fM\u0006,H\u000e\u001e\u00132+\t\u0011IF\u000b\u0003\u0003@\t-\u0011aE:fiV\u0004Xj\\2lI\u0011,g-Y;mi\u0012\u0012TC\u0001B0U\u0011\u0011\u0019Fa\u0003")
public class ClusterLinkSyncTopicsConfigsTest {
    private final ClusterLinkScheduler scheduler = new ClusterLinkScheduler(0, 100);
    private final ConfluentAdmin sourceAdmin = (ConfluentAdmin)Mockito.mock(ConfluentAdmin.class);
    private final ClusterLinkLocalAdmin localClusterLinkAdminClient = (ClusterLinkLocalAdmin)Mockito.mock(ClusterLinkLocalAdmin.class);
    private final ClusterLinkMetadataManager metadataManager = (ClusterLinkMetadataManager)Mockito.mock(ClusterLinkMetadataManager.class);
    private final ClusterLinkDestClientManager clientManager = (ClusterLinkDestClientManager)Mockito.mock(ClusterLinkDestClientManager.class);
    private final ClusterLinkMetrics metrics = new ClusterLinkMetrics("test-link", Uuid.randomUuid(), ClusterLinkConfig.LinkMode.DESTINATION, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (ConnectionMode)ConnectionMode.Inbound$.MODULE$, false, (ClusterLinkManager)Mockito.mock(ClusterLinkManager.class), (Option)None$.MODULE$, new Metrics(), (Option)None$.MODULE$, false);
    private final ClusterLinkConfig linkConfig = ClusterLinkConfig$.MODULE$.create((java.util.Map)new Properties(null){
        {
            this.put("bootstrap.servers", "localhost:2345");
        }
    }, (Option)None$.MODULE$, true);
    private final MirrorTopicConfigSyncRules topicConfigSyncRules = this.linkConfig().topicConfigSyncRules();
    private final Time time = new MockTime();
    private final ClusterLinkRequestQuota quota = UnboundedClusterLinkRequestQuota$.MODULE$;

    private ClusterLinkScheduler scheduler() {
        return this.scheduler;
    }

    private ConfluentAdmin sourceAdmin() {
        return this.sourceAdmin;
    }

    private ClusterLinkLocalAdmin localClusterLinkAdminClient() {
        return this.localClusterLinkAdminClient;
    }

    private ClusterLinkMetadataManager metadataManager() {
        return this.metadataManager;
    }

    private ClusterLinkDestClientManager clientManager() {
        return this.clientManager;
    }

    private ClusterLinkMetrics metrics() {
        return this.metrics;
    }

    private ClusterLinkConfig linkConfig() {
        return this.linkConfig;
    }

    private MirrorTopicConfigSyncRules topicConfigSyncRules() {
        return this.topicConfigSyncRules;
    }

    private Time time() {
        return this.time;
    }

    private ClusterLinkRequestQuota quota() {
        return this.quota;
    }

    @BeforeEach
    public void setUp() {
        this.scheduler().startup();
        this.metrics().startup();
    }

    @AfterEach
    public void tearDown() {
        this.scheduler().shutdown();
        this.metrics().shutdown();
    }

    @Test
    public void testUpdateConfigs() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl config = new KafkaFutureImpl();
        config.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("cleanup.policy", "compact"), (List)Nil$.MODULE$)).asJavaCollection()));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Properties curProps = new Properties();
        curProps.put("cleanup.policy", "delete");
        curProps.put("unclean.leader.election.enable", "true");
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        Properties newProps = new Properties();
        newProps.put("cleanup.policy", "compact");
        newProps.put("unclean.leader.election.enable", "true");
        AlterConfigsResult alterConfigsResult = this.mockAlterConfigsResult((List<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Option<Throwable>)None$.MODULE$);
        Mockito.when((Object)this.localClusterLinkAdminClient().incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))))).thenReturn((Object)alterConfigsResult);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)this.newClusterLinkSyncTopicsConfigs(linkData).runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertTrue((boolean)result.errs().isEmpty());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig(topic);
        ((Admin)Mockito.verify((Object)this.localClusterLinkAdminClient())).incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))));
        newProps.remove("unclean.leader.election.enable");
    }

    @Test
    public void testNoUpdateConfigs() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl config = new KafkaFutureImpl();
        config.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("cleanup.policy", "compact"), (List)Nil$.MODULE$)).asJavaCollection()));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Properties curProps = new Properties();
        curProps.put("cleanup.policy", "compact");
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)this.newClusterLinkSyncTopicsConfigs(linkData).runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertTrue((boolean)result.errs().isEmpty());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig(topic);
    }

    @Test
    public void testExceptionFetchingConfigs() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        Set<ConfigResource> describeConfigsArg = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topic));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenThrow(new Throwable[]{new IllegalStateException("")});
        ClusterLinkSyncTopicsConfigs syncTopicsConfigs = this.newClusterLinkSyncTopicsConfigs(linkData);
        Assertions.assertTrue((boolean)(((ExecutionException)Assertions$.MODULE$.intercept((Function0 & Serializable)() -> (ClusterLinkScheduler.TaskResult)syncTopicsConfigs.runOnce().get(5L, TimeUnit.SECONDS), ClassTag$.MODULE$.apply(ExecutionException.class), new Position("ClusterLinkSyncTopicsConfigsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 149))).getCause() instanceof IllegalStateException));
        Option taskDesc = syncTopicsConfigs.taskDescription();
        Assertions.assertTrue((boolean)taskDesc.isDefined());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)InternalTaskErrorCode$.MODULE$, "Failed to run the topic configs sync task for an unknown reason."), (List)Nil$.MODULE$), (Object)((TaskDescription)taskDesc.get()).errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
    }

    @Test
    public void testAuthorizationExceptionFromDescribeConfigsFuture() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        KafkaFutureImpl describeFuture = new KafkaFutureImpl();
        describeFuture.completeExceptionally((Throwable)new TopicAuthorizationException("Unauthorized."));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeResult = new DescribeConfigsResult(Collections.singletonMap(resource, describeFuture));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeResult);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)this.newClusterLinkSyncTopicsConfigs(linkData).runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)AuthorizationTaskErrorCode$.MODULE$, new StringBuilder(66).append("Unable to describe topic configs due to authorization issues for ").append(topic).append(".").toString()), (List)Nil$.MODULE$), (Object)result.errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
    }

    @Test
    public void testUnknownTopicOrPartitionExceptionFromDescribeConfigsFuture() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        KafkaFutureImpl describeFuture = new KafkaFutureImpl();
        describeFuture.completeExceptionally((Throwable)new UnknownTopicOrPartitionException("Unknown topic or partition."));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeResult = new DescribeConfigsResult(Collections.singletonMap(resource, describeFuture));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeResult);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)this.newClusterLinkSyncTopicsConfigs(linkData).runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)UnknownTopicOrPartitionErrorCode$.MODULE$, "Unable to describe topic configs due to unknown topic partition on the source for test-topic."), (List)Nil$.MODULE$), (Object)result.errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
    }

    @Test
    public void testAuthorizationExceptionFromAlterConfigsFuture() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl config = new KafkaFutureImpl();
        config.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("cleanup.policy", "compact"), (List)Nil$.MODULE$)).asJavaCollection()));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Properties curProps = new Properties();
        curProps.put("cleanup.policy", "delete");
        curProps.put("unclean.leader.election.enable", "true");
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        Properties newProps = new Properties();
        newProps.put("cleanup.policy", "compact");
        newProps.put("unclean.leader.election.enable", "true");
        KafkaFutureImpl alterFuture = new KafkaFutureImpl();
        alterFuture.completeExceptionally((Throwable)new TopicAuthorizationException("Unauthorized."));
        AlterConfigsResult alterResult = (AlterConfigsResult)Mockito.mock(AlterConfigsResult.class);
        Mockito.when((Object)alterResult.all()).thenReturn((Object)alterFuture);
        java.util.Map<ConfigResource, KafkaFutureImpl> resourceMap = Collections.singletonMap(resource, alterFuture);
        Mockito.when((Object)alterResult.values()).thenReturn(resourceMap);
        Mockito.when((Object)this.localClusterLinkAdminClient().incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))))).thenReturn((Object)alterResult);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)this.newClusterLinkSyncTopicsConfigs(linkData).runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)AuthorizationTaskErrorCode$.MODULE$, new StringBuilder(63).append("Unable to alter topic configs due to authorization issues for ").append(topic).append(".").toString()), (List)Nil$.MODULE$), (Object)result.errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig(topic);
        ((Admin)Mockito.verify((Object)this.localClusterLinkAdminClient())).incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))));
    }

    @Test
    public void testExceptionAlteringConfigs() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl config = new KafkaFutureImpl();
        config.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("cleanup.policy", "compact"), (List)Nil$.MODULE$)).asJavaCollection()));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Properties curProps = new Properties();
        curProps.put("cleanup.policy", "delete");
        curProps.put("unclean.leader.election.enable", "true");
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        Properties newProps = new Properties();
        newProps.put("cleanup.policy", "compact");
        newProps.put("unclean.leader.election.enable", "true");
        Mockito.when((Object)this.localClusterLinkAdminClient().incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))))).thenThrow(new Throwable[]{new IllegalStateException("")});
        ClusterLinkSyncTopicsConfigs syncTopicsConfigs = this.newClusterLinkSyncTopicsConfigs(linkData);
        Assertions.assertTrue((boolean)(((ExecutionException)Assertions$.MODULE$.intercept((Function0 & Serializable)() -> (ClusterLinkScheduler.TaskResult)syncTopicsConfigs.runOnce().get(5L, TimeUnit.SECONDS), ClassTag$.MODULE$.apply(ExecutionException.class), new Position("ClusterLinkSyncTopicsConfigsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 297))).getCause() instanceof IllegalStateException));
        Option taskDesc = syncTopicsConfigs.taskDescription();
        Assertions.assertTrue((boolean)taskDesc.isDefined());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)InternalTaskErrorCode$.MODULE$, "Failed to run the topic configs sync task for an unknown reason."), (List)Nil$.MODULE$), (Object)((TaskDescription)taskDesc.get()).errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig(topic);
        ((Admin)Mockito.verify((Object)this.localClusterLinkAdminClient())).incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))));
    }

    @Test
    public void testExceptionConfigsResult() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        .colon.colon topics = new .colon.colon((Object)"test-topic-1", (List)new .colon.colon((Object)"test-topic-2", (List)new .colon.colon((Object)"test-topic-3", (List)Nil$.MODULE$)));
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn((Object)topics.toSet());
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl successConfig = new KafkaFutureImpl();
        successConfig.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("cleanup.policy", "compact"), (List)Nil$.MODULE$)).asJavaCollection()));
        KafkaFutureImpl errorConfig = new KafkaFutureImpl();
        errorConfig.completeExceptionally((Throwable)new TopicAuthorizationException(""));
        List resources = topics.map((Function1 & Serializable)topic -> this.newConfigResource((String)topic));
        HashSet describeConfigsArg = new HashSet(3);
        resources.foreach((Function1 & Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)describeConfigsArg.add(x$1)));
        DescribeConfigsResult describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(resources.head()), (Object)successConfig), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(resources.apply(1)), (Object)errorConfig), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(resources.apply(2)), (Object)successConfig)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Properties curProps = new Properties();
        curProps.put("cleanup.policy", "delete");
        Mockito.when((Object)this.metadataManager().getTopicConfig((String)topics.head())).thenReturn((Object)curProps);
        Mockito.when((Object)this.metadataManager().getTopicConfig((String)topics.apply(2))).thenReturn((Object)curProps);
        Properties newProps = new Properties();
        newProps.put("cleanup.policy", "compact");
        .colon.colon changedTopics = new .colon.colon((Object)"test-topic-1", (List)new .colon.colon((Object)"test-topic-3", (List)Nil$.MODULE$));
        AlterConfigsResult alterConfigsResult = this.mockAlterConfigsResult((List<String>)changedTopics, (Option<Throwable>)None$.MODULE$);
        Mockito.when((Object)this.localClusterLinkAdminClient().incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)changedTopics.map((Function1 & Serializable)t -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t), (Object)newProps)).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())))).thenReturn((Object)alterConfigsResult);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)this.newClusterLinkSyncTopicsConfigs(linkData).runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)AuthorizationTaskErrorCode$.MODULE$, "Unable to describe topic configs due to authorization issues for test-topic-2."), (List)Nil$.MODULE$), (Object)result.errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)2))).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig((String)topics.head());
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig((String)topics.apply(2));
        ((Admin)Mockito.verify((Object)this.localClusterLinkAdminClient())).incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)changedTopics.map((Function1 & Serializable)t -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t), (Object)newProps)).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())));
    }

    @Test
    public void testExceptionAlterConfigsResult() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl config = new KafkaFutureImpl();
        config.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("cleanup.policy", "compact"), (List)Nil$.MODULE$)).asJavaCollection()));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Properties curProps = new Properties();
        curProps.put("cleanup.policy", "delete");
        curProps.put("unclean.leader.election.enable", "true");
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        Properties newProps = new Properties();
        newProps.put("cleanup.policy", "compact");
        newProps.put("unclean.leader.election.enable", "true");
        AlterConfigsResult alterConfigResult = this.mockAlterConfigsResult((List<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Option<Throwable>)new Some((Object)new TopicAuthorizationException("unauthorized")));
        Mockito.when((Object)this.localClusterLinkAdminClient().incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))))).thenReturn((Object)alterConfigResult);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)this.newClusterLinkSyncTopicsConfigs(linkData).runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)AuthorizationTaskErrorCode$.MODULE$, "Unable to alter topic configs due to authorization issues for test-topic."), (List)Nil$.MODULE$), (Object)result.errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig(topic);
        ((Admin)Mockito.verify((Object)this.localClusterLinkAdminClient())).incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))));
    }

    @Test
    public void testChangeTopics() {
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)None$.MODULE$, (Option<String>)None$.MODULE$);
        .colon.colon topics = new .colon.colon((Object)"test-topic-1", (List)new .colon.colon((Object)"test-topic-2", (List)Nil$.MODULE$));
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{(String)topics.head()}))).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{(String)topics.apply(1)})));
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl config1 = new KafkaFutureImpl();
        config1.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("cleanup.policy", "compact"), (List)Nil$.MODULE$)).asJavaCollection()));
        KafkaFutureImpl config2 = new KafkaFutureImpl();
        config2.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("retention.ms", "1000000"), (List)Nil$.MODULE$)).asJavaCollection()));
        List resources = topics.map((Function1 & Serializable)topic -> this.newConfigResource((String)topic));
        Set<Object> describeConfigsArg1 = Collections.singleton(resources.head());
        Set<Object> describeConfigsArg2 = Collections.singleton(resources.apply(1));
        DescribeConfigsResult describeConfigsResult1 = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(resources.head()), (Object)config1)}))).asJava()));
        DescribeConfigsResult describeConfigsResult2 = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(resources.apply(1)), (Object)config2)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg1))).thenReturn((Object)describeConfigsResult1);
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg2))).thenReturn((Object)describeConfigsResult2);
        Properties curProps1 = new Properties();
        curProps1.put("cleanup.policy", "compact");
        Properties curProps2 = new Properties();
        curProps2.put("retention.ms", "1000000");
        Mockito.when((Object)this.metadataManager().getTopicConfig((String)topics.head())).thenReturn((Object)curProps1);
        Mockito.when((Object)this.metadataManager().getTopicConfig((String)topics.apply(1))).thenReturn((Object)curProps2);
        ClusterLinkSyncTopicsConfigs syncTopicsConfigs = this.newClusterLinkSyncTopicsConfigs(linkData);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)syncTopicsConfigs.runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertTrue((boolean)result.errs().isEmpty());
        Assertions.assertTrue((boolean)((ClusterLinkScheduler.TaskResult)syncTopicsConfigs.runOnce().get(5L, TimeUnit.SECONDS)).errs().isEmpty());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)2))).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)2))).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg1));
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg2));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig((String)topics.head());
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig((String)topics.apply(1));
    }

    @Test
    public void testClusterLinkAlterConfigPolicy() {
        scala.collection.immutable.Map oldConfigs = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"preallocate"), (Object)"true")}));
        scala.collection.immutable.Map newConfigs = Predef$.MODULE$.Map().empty();
        ClusterLinkAlterConfigPolicy policy = (ClusterLinkAlterConfigPolicy)Mockito.mock(ClusterLinkAlterConfigPolicy.class);
        Mockito.when((Object)policy.clusterLinkRestrictTopicConfigs((java.util.Map)ArgumentMatchers.eq((Object)CollectionConverters$.MODULE$.MapHasAsJava((Map)oldConfigs).asJava()))).thenReturn((Object)CollectionConverters$.MODULE$.MapHasAsJava((Map)newConfigs).asJava());
        ((ClusterLinkAlterConfigPolicy)Mockito.doNothing().when((Object)policy)).clusterLinkValidateTopicConfigs((java.util.Map)ArgumentMatchers.eq((Object)CollectionConverters$.MODULE$.MapHasAsJava((Map)newConfigs).asJava()));
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)new Some((Object)policy), (Option<String>)new Some((Object)"tenant_"));
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl config = new KafkaFutureImpl();
        config.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("preallocate", "true"), (List)Nil$.MODULE$)).asJavaCollection()));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Properties curProps = new Properties();
        curProps.put("preallocate", "true");
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        Properties newProps = new Properties();
        AlterConfigsResult alterConfigsResult = this.mockAlterConfigsResult((List<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Option<Throwable>)None$.MODULE$);
        Mockito.when((Object)this.localClusterLinkAdminClient().incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))))).thenReturn((Object)alterConfigsResult);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)this.newClusterLinkSyncTopicsConfigs(linkData).runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertTrue((boolean)result.errs().isEmpty());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig(topic);
        ((Admin)Mockito.verify((Object)this.localClusterLinkAdminClient())).incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))));
        ((ClusterLinkAlterConfigPolicy)Mockito.verify((Object)policy)).clusterLinkRestrictTopicConfigs((java.util.Map)ArgumentMatchers.eq((Object)CollectionConverters$.MODULE$.MapHasAsJava((Map)oldConfigs).asJava()));
        ((ClusterLinkAlterConfigPolicy)Mockito.verify((Object)policy)).clusterLinkValidateTopicConfigs((java.util.Map)ArgumentMatchers.eq((Object)CollectionConverters$.MODULE$.MapHasAsJava((Map)newConfigs).asJava()));
    }

    @Test
    public void testAlterConfigPolicyViolation() {
        AlterConfigPolicy policy = new AlterConfigPolicy(null){

            public void configure(java.util.Map<String, ?> configs) {
            }

            public void close() {
            }

            public void validate(AlterConfigPolicy.RequestMetadata requestMetadata) {
                throw new PolicyViolationException("Violated!");
            }
        };
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)new Some((Object)policy), (Option<String>)None$.MODULE$);
        String topic = "test-topic";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl config = new KafkaFutureImpl();
        config.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("cleanup.policy", "compact"), (List)Nil$.MODULE$)).asJavaCollection()));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Properties curProps = new Properties();
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)this.newClusterLinkSyncTopicsConfigs(linkData).runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)PolicyViolationTaskErrorCode$.MODULE$, "Could not update mirror topic 'test-topic' configuration due to policy violation on cluster link link-name"), (List)Nil$.MODULE$), (Object)result.errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig(topic);
    }

    @Test
    public void testExcessiveLog() {
        AlterConfigPolicy policy = new AlterConfigPolicy(null){

            public void configure(java.util.Map<String, ?> configs) {
            }

            public void close() {
            }

            public void validate(AlterConfigPolicy.RequestMetadata requestMetadata) {
                throw new PolicyViolationException("Violated!");
            }
        };
        ClusterLinkData linkData = this.setupMock((Option<AlterConfigPolicy>)new Some((Object)policy), (Option<String>)None$.MODULE$);
        String topic = "test-topic-1";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        Mockito.when((Object)this.clientManager().topicConfigSyncRules()).thenReturn((Object)this.topicConfigSyncRules());
        KafkaFutureImpl config = new KafkaFutureImpl();
        config.complete((Object)new Config(CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)new .colon.colon((Object)new ConfigEntry("cleanup.policy", "compact"), (List)Nil$.MODULE$)).asJavaCollection()));
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Set<ConfigResource> describeConfigsArg = Collections.singleton(resource);
        DescribeConfigsResult describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Properties curProps = new Properties();
        curProps.put("cleanup.policy", "delete");
        curProps.put("unclean.leader.election.enable", "true");
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        ClusterLinkSyncTopicsConfigs syncTopicsConfigs = this.newClusterLinkSyncTopicsConfigs(linkData);
        ClusterLinkScheduler.TaskResult result = (ClusterLinkScheduler.TaskResult)syncTopicsConfigs.runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)result.completed());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)PolicyViolationTaskErrorCode$.MODULE$, "Could not update mirror topic 'test-topic-1' configuration due to policy violation on cluster link link-name"), (List)Nil$.MODULE$), (Object)result.errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig(topic);
        Mockito.when((Object)this.clientManager().alterConfigPolicy()).thenReturn((Object)None$.MODULE$);
        topic = "test-topic-2";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        KafkaFutureImpl exceptionRemoteConfig = new KafkaFutureImpl();
        exceptionRemoteConfig.completeExceptionally((Throwable)new TopicAuthorizationException(""));
        resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        describeConfigsArg = Collections.singleton(resource);
        describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)exceptionRemoteConfig)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        ClusterLinkScheduler.TaskResult resultTwo = (ClusterLinkScheduler.TaskResult)syncTopicsConfigs.runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)resultTwo.completed());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)AuthorizationTaskErrorCode$.MODULE$, "Unable to describe topic configs due to authorization issues for test-topic-2."), (List)Nil$.MODULE$), (Object)resultTwo.errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)2))).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager())).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        topic = "test-topic-3";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        describeConfigsArg = Collections.singleton(resource);
        describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        Properties newProps = new Properties();
        newProps.put("cleanup.policy", "compact");
        newProps.put("unclean.leader.election.enable", "true");
        AlterConfigsResult alterConfigResult = this.mockAlterConfigsResult((List<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Option<Throwable>)new Some((Object)new TopicAuthorizationException("unauthorized")));
        Mockito.when((Object)this.localClusterLinkAdminClient().incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))))).thenReturn((Object)alterConfigResult);
        ClusterLinkScheduler.TaskResult resultThree = (ClusterLinkScheduler.TaskResult)syncTopicsConfigs.runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)resultThree.completed());
        Assertions.assertEquals((Object)new .colon.colon((Object)new TaskErrorCodeAndMsg((TaskErrorCode)AuthorizationTaskErrorCode$.MODULE$, "Unable to alter topic configs due to authorization issues for test-topic-3."), (List)Nil$.MODULE$), (Object)resultThree.errs());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)3))).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)2))).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin())).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager())).getTopicConfig(topic);
        ((Admin)Mockito.verify((Object)this.localClusterLinkAdminClient())).incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))));
        alterConfigResult = this.mockAlterConfigsResult((List<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Option<Throwable>)None$.MODULE$);
        Mockito.when((Object)this.localClusterLinkAdminClient().incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))))).thenReturn((Object)alterConfigResult);
        ClusterLinkScheduler.TaskResult resultFour = (ClusterLinkScheduler.TaskResult)syncTopicsConfigs.runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)resultFour.completed());
        Assertions.assertTrue((boolean)resultFour.errs().isEmpty());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)4))).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)3))).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin(), (VerificationMode)Mockito.times((int)2))).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager(), (VerificationMode)Mockito.times((int)2))).getTopicConfig(topic);
        ((Admin)Mockito.verify((Object)this.localClusterLinkAdminClient(), (VerificationMode)Mockito.times((int)2))).incrementalAlterConfigs(this.alterConfigRequestMap((scala.collection.immutable.Map<String, Properties>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)newProps)})))));
        topic = "test-topic-1";
        Mockito.when((Object)this.clientManager().getTopics()).thenReturn(Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topic})));
        resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        describeConfigsArg = Collections.singleton(resource);
        describeConfigsResult = new DescribeConfigsResult(Collections.unmodifiableMap(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)resource), (Object)config)}))).asJava()));
        Mockito.when((Object)this.sourceAdmin().describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg))).thenReturn((Object)describeConfigsResult);
        curProps.put("cleanup.policy", "compact");
        Mockito.when((Object)this.metadataManager().getTopicConfig(topic)).thenReturn((Object)curProps);
        ClusterLinkScheduler.TaskResult resultFive = (ClusterLinkScheduler.TaskResult)syncTopicsConfigs.runOnce().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)resultFive.completed());
        Assertions.assertTrue((boolean)resultFive.errs().isEmpty());
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)5))).getTopics();
        ((ClusterLinkDestClientManager)Mockito.verify((Object)this.clientManager(), (VerificationMode)Mockito.times((int)4))).topicConfigSyncRules();
        ((Admin)Mockito.verify((Object)this.sourceAdmin(), (VerificationMode)Mockito.times((int)2))).describeConfigs((Collection)ArgumentMatchers.eq(describeConfigsArg));
        ((ClusterLinkMetadataManager)Mockito.verify((Object)this.metadataManager(), (VerificationMode)Mockito.times((int)2))).getTopicConfig(topic);
    }

    private ConfigResource newConfigResource(String topic) {
        return new ConfigResource(ConfigResource.Type.TOPIC, topic);
    }

    private java.util.Map<ConfigResource, Collection<AlterConfigOp>> alterConfigRequestMap(scala.collection.immutable.Map<String, Properties> configMap) {
        HashMap<ConfigResource, Collection<AlterConfigOp>> requestMap = new HashMap<ConfigResource, Collection<AlterConfigOp>>();
        configMap.foreach((Function1 & Serializable)entry2 -> {
            String topic = (String)entry2._1();
            Properties props = (Properties)entry2._2();
            HashSet configs = new HashSet();
            CollectionConverters$.MODULE$.PropertiesHasAsScala(props).asScala().foreach((Function1 & Serializable)entry -> BoxesRunTime.boxToBoolean((boolean)configs.add(new AlterConfigOp(new ConfigEntry((String)entry._1(), (String)entry._2()), AlterConfigOp.OpType.SET))));
            return requestMap.put(new ConfigResource(ConfigResource.Type.TOPIC, topic), configs);
        });
        return requestMap;
    }

    private AlterConfigsResult mockAlterConfigsResult(List<String> topics, Option<Throwable> exception) {
        KafkaFutureImpl future = new KafkaFutureImpl();
        if (exception instanceof Some) {
            Throwable e = (Throwable)((Some)exception).value();
            future.completeExceptionally(e);
        } else if (None$.MODULE$.equals(exception)) {
            future.complete(null);
        } else {
            throw new MatchError(exception);
        }
        scala.collection.immutable.Map valueMap = topics.map((Function1 & Serializable)t -> new Tuple2((Object)new ConfigResource(ConfigResource.Type.TOPIC, t), (Object)future)).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        AlterConfigsResult result = (AlterConfigsResult)Mockito.mock(AlterConfigsResult.class);
        Mockito.when((Object)result.all()).thenReturn((Object)future);
        Mockito.when((Object)result.values()).thenReturn((Object)CollectionConverters$.MODULE$.MapHasAsJava((Map)valueMap).asJava());
        return result;
    }

    private Option<Throwable> mockAlterConfigsResult$default$2() {
        return None$.MODULE$;
    }

    private ClusterLinkSyncTopicsConfigs newClusterLinkSyncTopicsConfigs(ClusterLinkData linkData) {
        return new ClusterLinkSyncTopicsConfigs(this.clientManager(), this.metadataManager(), (ConfluentAdmin)this.localClusterLinkAdminClient(), 100, this.metrics(), linkData, this.time(), this.quota());
    }

    private ClusterLinkData setupMock(Option<AlterConfigPolicy> alterConfigPolicy, Option<String> tenantPrefix) {
        Mockito.reset((Object[])new ConfluentAdmin[]{this.sourceAdmin()});
        Mockito.reset((Object[])new ClusterLinkLocalAdmin[]{this.localClusterLinkAdminClient()});
        Mockito.reset((Object[])new ClusterLinkMetadataManager[]{this.metadataManager()});
        Mockito.reset((Object[])new ClusterLinkDestClientManager[]{this.clientManager()});
        Mockito.when((Object)this.clientManager().scheduler()).thenReturn((Object)this.scheduler());
        Mockito.when((Object)this.clientManager().getAdmin()).thenReturn((Object)this.sourceAdmin());
        Mockito.when((Object)this.clientManager().alterConfigPolicy()).thenReturn(alterConfigPolicy);
        ClusterLinkData linkData = new ClusterLinkData("link-name", Uuid.randomUuid(), (Option)None$.MODULE$, tenantPrefix, false);
        Mockito.when((Object)this.clientManager().linkData()).thenReturn((Object)linkData);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataManager().isLinkCoordinatorEnabled())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataManager().isLinkCoordinator("link-name"))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        return linkData;
    }

    private Option<AlterConfigPolicy> setupMock$default$1() {
        return None$.MODULE$;
    }

    private Option<String> setupMock$default$2() {
        return None$.MODULE$;
    }
}

