/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConnectionManager;
import kafka.server.link.ClusterLinkDynamicLogger;
import kafka.server.link.ClusterLinkDynamicLogger$;
import kafka.server.link.ClusterLinkFactory$;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.CoordinatorListener;
import kafka.server.link.MetadataListener;
import kafka.server.link.MetadataRefreshListener;
import kafka.utils.Logging;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.message.DescribeClusterLinksResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.DescribeClusterLinksRequest;
import org.apache.kafka.common.requests.DescribeClusterLinksResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.ShutdownableThread;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0001\u0005=g\u0001B\u0013'\u00015B\u0001\u0002\u0012\u0001\u0003\u0002\u0003\u0006I!\u0012\u0005\t\u0013\u0002\u0011\t\u0011)A\u0005\u0015\"AQ\n\u0001B\u0001B\u0003%a\n\u0003\u0005X\u0001\t\u0015\r\u0011\"\u0001Y\u0011!a\u0006A!A!\u0002\u0013I\u0006\u0002C/\u0001\u0005\u0003\u0005\u000b\u0011\u00020\t\u0011\u0015\u0004!\u0011!Q\u0001\n\u0019D\u0001\"\u001d\u0001\u0003\u0002\u0003\u0006IA\u001d\u0005\u0006o\u0002!\t\u0001\u001f\u0005\n\u0003\u0007\u0001!\u0019!C\u0005\u0003\u000bA\u0001\"!\u0004\u0001A\u0003%\u0011q\u0001\u0005\u000b\u0003\u001f\u0001!\u0019!C\u0001M\u0005E\u0001\u0002CA\r\u0001\u0001\u0006I!a\u0005\t\u0013\u0005m\u0001A1A\u0005\n\u0005u\u0001\u0002CA\u0013\u0001\u0001\u0006I!a\b\t\u0013\u0005\u001d\u0002A1A\u0005\n\u0005%\u0002\u0002CA!\u0001\u0001\u0006I!a\u000b\t\u0013\u0005\r\u0003A1A\u0005\n\u0005\u0015\u0003\u0002CA(\u0001\u0001\u0006I!a\u0012\t\u0017\u0005E\u0003\u00011AA\u0002\u0013%\u00111\u000b\u0005\f\u0003;\u0002\u0001\u0019!a\u0001\n\u0013\ty\u0006C\u0006\u0002l\u0001\u0001\r\u0011!Q!\n\u0005U\u0003bCA;\u0001\u0001\u0007\t\u0019!C\u0005\u0003oB1\"a \u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002\u0002\"Y\u0011Q\u0011\u0001A\u0002\u0003\u0005\u000b\u0015BA=\u0011\u001d\tI\t\u0001C!\u0003\u0017Cq!!$\u0001\t\u0003\ny\tC\u0004\u0002\u0018\u0002!\t%a#\t\u000f\u0005e\u0005\u0001\"\u0001\u0002\u001c\"9\u0011\u0011\u0015\u0001\u0005\u0002\u0005\r\u0006bBAT\u0001\u0011\u0005\u0011\u0011\u0016\u0005\b\u0003[\u0003A\u0011CAX\u0011\u001d\t)\f\u0001C!\u0003\u0017Cq!a.\u0001\t\u0013\tY\tC\u0004\u0002:\u0002!I!a/\t\u000f\u00055\u0007\u0001\"\u0003\u0002\f\nI2\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uCRC'/Z1e\u0015\t9\u0003&\u0001\u0003mS:\\'BA\u0015+\u0003\u0019\u0019XM\u001d<fe*\t1&A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\t\u0001q#H\u0010\t\u0003_aj\u0011\u0001\r\u0006\u0003cI\nA!\u001e;jY*\u0011\u0011f\r\u0006\u0003WQR!!\u000e\u001c\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00059\u0014aA8sO&\u0011\u0011\b\r\u0002\u0013'\",H\u000fZ8x]\u0006\u0014G.\u001a+ie\u0016\fG\r\u0005\u0002<y5\ta%\u0003\u0002>M\t9R*\u001a;bI\u0006$\u0018MU3ge\u0016\u001c\b\u000eT5ti\u0016tWM\u001d\t\u0003\u007f\tk\u0011\u0001\u0011\u0006\u0003\u0003*\nQ!\u001e;jYNL!a\u0011!\u0003\u000f1{wmZ5oO\u0006a!M]8lKJ\u001cuN\u001c4jOB\u0011aiR\u0007\u0002Q%\u0011\u0001\n\u000b\u0002\f\u0017\u000647.Y\"p]\u001aLw-A\tdYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e\u0004\"aO&\n\u000513#!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006\t2m\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0011\u0007=\u0013F+D\u0001Q\u0015\u0005\t\u0016!B:dC2\f\u0017BA*Q\u0005\u0019y\u0005\u000f^5p]B\u00111(V\u0005\u0003-\u001a\u0012Ad\u00117vgR,'\u000fT5oW\u000e{gN\\3di&|g.T1oC\u001e,'/A\ndYV\u001cH/\u001a:MS:\\W*\u001a;bI\u0006$\u0018-F\u0001Z!\tY$,\u0003\u0002\\M\t\u00192\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uC\u0006!2\r\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uC\u0002\nq!\\3ue&\u001c7\u000f\u0005\u0002`G6\t\u0001M\u0003\u0002^C*\u0011!mM\u0001\u0007G>lWn\u001c8\n\u0005\u0011\u0004'aB'fiJL7m]\u0001\u000bG2LWM\u001c;UsB,\u0007CA4o\u001d\tAG\u000e\u0005\u0002j!6\t!N\u0003\u0002lY\u00051AH]8pizJ!!\u001c)\u0002\rA\u0013X\rZ3g\u0013\ty\u0007O\u0001\u0004TiJLgn\u001a\u0006\u0003[B\u000bA\u0001^5nKB\u00111/^\u0007\u0002i*\u0011\u0011)Y\u0005\u0003mR\u0014A\u0001V5nK\u00061A(\u001b8jiz\"\u0012\"\u001f>|yvtx0!\u0001\u0011\u0005m\u0002\u0001\"\u0002#\n\u0001\u0004)\u0005\"B%\n\u0001\u0004Q\u0005\"B'\n\u0001\u0004q\u0005\"B,\n\u0001\u0004I\u0006\"B/\n\u0001\u0004q\u0006\"B3\n\u0001\u00041\u0007\"B9\n\u0001\u0004\u0011\u0018A\u00037pO\u000e{g\u000e^3yiV\u0011\u0011q\u0001\t\u0004g\u0006%\u0011bAA\u0006i\nQAj\\4D_:$X\r\u001f;\u0002\u00171|wmQ8oi\u0016DH\u000fI\u0001\u0012G2,8\u000f^3s\u0019&t7n\u00117jK:$XCAA\n!\rY\u0014QC\u0005\u0004\u0003/1#\u0001G\"mkN$XM\u001d'j].tU\r^<pe.\u001cE.[3oi\u0006\u00112\r\\;ti\u0016\u0014H*\u001b8l\u00072LWM\u001c;!\u00035!\u0017P\\1nS\u000edunZ4feV\u0011\u0011q\u0004\t\u0004w\u0005\u0005\u0012bAA\u0012M\tA2\t\\;ti\u0016\u0014H*\u001b8l\tft\u0017-\\5d\u0019><w-\u001a:\u0002\u001d\u0011Lh.Y7jG2{wmZ3sA\u0005\tR.\u001a;bI\u0006$\u0018\rT5ti\u0016tWM]:\u0016\u0005\u0005-\u0002CBA\u0017\u0003o\tY$\u0004\u0002\u00020)!\u0011\u0011GA\u001a\u0003\u001diW\u000f^1cY\u0016T1!!\u000eQ\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0003s\tyC\u0001\u0004Ck\u001a4WM\u001d\t\u0004w\u0005u\u0012bAA M\t\u0001R*\u001a;bI\u0006$\u0018\rT5ti\u0016tWM]\u0001\u0013[\u0016$\u0018\rZ1uC2K7\u000f^3oKJ\u001c\b%\u0001\u000bd_>\u0014H-\u001b8bi>\u0014H*[:uK:,'o]\u000b\u0003\u0003\u000f\u0002b!!\f\u00028\u0005%\u0003cA\u001e\u0002L%\u0019\u0011Q\n\u0014\u0003'\r{wN\u001d3j]\u0006$xN\u001d'jgR,g.\u001a:\u0002+\r|wN\u001d3j]\u0006$xN\u001d'jgR,g.\u001a:tA\u000512-\u001e:sK:$X*\u001a;bI\u0006$\u0018m\u00117vgR,'/\u0006\u0002\u0002VA!\u0011qKA-\u001b\u0005\t\u0017bAA.C\n91\t\\;ti\u0016\u0014\u0018AG2veJ,g\u000e^'fi\u0006$\u0017\r^1DYV\u001cH/\u001a:`I\u0015\fH\u0003BA1\u0003O\u00022aTA2\u0013\r\t)\u0007\u0015\u0002\u0005+:LG\u000fC\u0005\u0002jU\t\t\u00111\u0001\u0002V\u0005\u0019\u0001\u0010J\u0019\u0002/\r,(O]3oi6+G/\u00193bi\u0006\u001cE.^:uKJ\u0004\u0003f\u0001\f\u0002pA\u0019q*!\u001d\n\u0007\u0005M\u0004K\u0001\u0005w_2\fG/\u001b7f\u0003I\u0019WO\u001d:f]R\u001cun\u001c:eS:\fGo\u001c:\u0016\u0005\u0005e\u0004\u0003BA,\u0003wJ1!! b\u0005\u0011qu\u000eZ3\u0002-\r,(O]3oi\u000e{wN\u001d3j]\u0006$xN]0%KF$B!!\u0019\u0002\u0004\"I\u0011\u0011\u000e\r\u0002\u0002\u0003\u0007\u0011\u0011P\u0001\u0014GV\u0014(/\u001a8u\u0007>|'\u000fZ5oCR|'\u000f\t\u0015\u00043\u0005=\u0014A\u00023p/>\u00148\u000e\u0006\u0002\u0002b\u0005\u0001\u0012N\\5uS\u0006$Xm\u00155vi\u0012|wO\u001c\u000b\u0003\u0003#\u00032aTAJ\u0013\r\t)\n\u0015\u0002\b\u0005>|G.Z1o\u00035\tw/Y5u'\",H\u000fZ8x]\u0006Y\u0011\r\u001a3MSN$XM\\3s)\u0011\t\t'!(\t\u000f\u0005}U\u00041\u0001\u0002<\u0005AA.[:uK:,'/\u0001\fbI\u0012\u001cun\u001c:eS:\fGo\u001c:MSN$XM\\3s)\u0011\t\t'!*\t\u000f\u0005}e\u00041\u0001\u0002J\u0005)\"/Z7pi\u0016d\u0015N\\6D_>\u0014H-\u001b8bi>\u0014XCAAV!\u0011y%+!\u001f\u0002'\r\u0014X-\u0019;f\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\u0015\r\u0005M\u0011\u0011WAZ\u0011\u0015I\u0005\u00051\u0001K\u0011\u00159\u0006\u00051\u0001Z\u0003]yg.T3uC\u0012\fG/\u0019*fcV,7\u000f^+qI\u0006$X-\u0001\u000bnCf\u0014WMR5oI\u000e{wN\u001d3j]\u0006$xN]\u0001#aJ|7-Z:t\t\u0016\u001c8M]5cK\u000ecWo\u001d;fe2Kgn\u001b*fgB|gn]3\u0015\t\u0005\u0005\u0014Q\u0018\u0005\b\u0003\u007f\u001b\u0003\u0019AAa\u0003!\u0011Xm\u001d9p]N,\u0007\u0003BAb\u0003\u0013l!!!2\u000b\u0007\u0005\u001d7'A\u0004dY&,g\u000e^:\n\t\u0005-\u0017Q\u0019\u0002\u000f\u00072LWM\u001c;SKN\u0004xN\\:f\u0003y)8/Z\"p]R\u0014x\u000e\u001c7fe\u0006\u001bH*\u001b8l\u0007>|'\u000fZ5oCR|'\u000f")
public class ClusterLinkMetadataThread
extends ShutdownableThread
implements MetadataRefreshListener,
Logging {
    private final KafkaConfig brokerConfig;
    private final ClusterLinkConfig clusterLinkConfig;
    private final Option<ClusterLinkConnectionManager> connectionManager;
    private final ClusterLinkMetadata clusterLinkMetadata;
    private final Metrics metrics;
    private final String clientType;
    private final Time time;
    private final LogContext logContext;
    private final ClusterLinkNetworkClient clusterLinkClient;
    private final ClusterLinkDynamicLogger dynamicLogger;
    private final Buffer<MetadataListener> metadataListeners;
    private final Buffer<CoordinatorListener> coordinatorListeners;
    private volatile Cluster currentMetadataCluster;
    private volatile Node currentCoordinator;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public ClusterLinkMetadata clusterLinkMetadata() {
        return this.clusterLinkMetadata;
    }

    private LogContext logContext() {
        return this.logContext;
    }

    public ClusterLinkNetworkClient clusterLinkClient() {
        return this.clusterLinkClient;
    }

    private ClusterLinkDynamicLogger dynamicLogger() {
        return this.dynamicLogger;
    }

    private Buffer<MetadataListener> metadataListeners() {
        return this.metadataListeners;
    }

    private Buffer<CoordinatorListener> coordinatorListeners() {
        return this.coordinatorListeners;
    }

    private Cluster currentMetadataCluster() {
        return this.currentMetadataCluster;
    }

    private void currentMetadataCluster_$eq(Cluster x$1) {
        this.currentMetadataCluster = x$1;
    }

    private Node currentCoordinator() {
        return this.currentCoordinator;
    }

    private void currentCoordinator_$eq(Node x$1) {
        this.currentCoordinator = x$1;
    }

    public void doWork() {
        try {
            this.clusterLinkClient().networkClient().poll(Long.MAX_VALUE, this.time.milliseconds());
            this.maybeFindCoordinator();
            Cluster newMetadataCluster = this.clusterLinkMetadata().fetch();
            if (newMetadataCluster == this.currentMetadataCluster()) {
                this.clusterLinkMetadata().maybeThrowFatalException();
                return;
            }
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Process cluster link metadata ").append(newMetadataCluster).toString());
            this.metadataListeners().foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
                x$2.onNewMetadata(newMetadataCluster);
                return BoxedUnit.UNIT;
            });
            this.currentMetadataCluster_$eq(newMetadataCluster);
            this.currentCoordinator_$eq(null);
            this.maybeFindCoordinator();
            this.dynamicLogger().resetErrors();
        }
        catch (Throwable throwable) {
            if (throwable instanceof DisconnectException && !this.clusterLinkClient().networkClient().active()) {
                return;
            }
            if (throwable instanceof Exception) {
                Exception exception = (Exception)throwable;
                this.dynamicLogger().error("Failed to refresh metadata", exception);
                this.metadataListeners().foreach((Function1 & Serializable & scala.Serializable)x$3 -> {
                    x$3.onMetadataFailure(exception);
                    return BoxedUnit.UNIT;
                });
                return;
            }
            throw throwable;
        }
    }

    public boolean initiateShutdown() {
        if (super.initiateShutdown()) {
            this.clusterLinkClient().initiateClose();
            return true;
        }
        return false;
    }

    public void awaitShutdown() {
        try {
            super.awaitShutdown();
        }
        catch (InterruptedException interruptedException) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Interrupted while awaiting metadata shutdown");
            super.awaitShutdown();
        }
        try {
            this.clusterLinkClient().close();
            return;
        }
        catch (Throwable e) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failed to close metadata client after shutting down thread", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            return;
        }
    }

    public void addListener(MetadataListener listener) {
        this.metadataListeners().$plus$eq((Object)listener);
    }

    public void addCoordinatorListener(CoordinatorListener listener) {
        this.coordinatorListeners().$plus$eq((Object)listener);
    }

    public Option<Node> remoteLinkCoordinator() {
        return Option$.MODULE$.apply((Object)this.currentCoordinator());
    }

    /*
     * WARNING - void declaration
     */
    public ClusterLinkNetworkClient createNetworkClient(ClusterLinkConfig clusterLinkConfig, ClusterLinkMetadata clusterLinkMetadata) {
        void var3_3;
        ClusterLinkNetworkClient clusterLinkClient = new ClusterLinkNetworkClient(this.brokerConfig, clusterLinkConfig, ClusterLinkMetrics$.MODULE$.throttleTimeSensorName(clusterLinkMetadata.linkName()), (Option<ClusterLinkMetadata>)new Some((Object)clusterLinkMetadata), (Option<MetadataUpdater>)None$.MODULE$, this.metrics, ClusterLinkFactory$.MODULE$.linkMetricTags(clusterLinkMetadata.linkName()), this.time, new StringBuilder(22).append("cluster-link-metadata-").append(clusterLinkMetadata.linkDescription()).toString(), this.clientType, this.logContext());
        this.connectionManager.foreach((Function1 & Serializable & scala.Serializable)x$4 -> {
            x$4.enableClusterLink(clusterLinkClient, (Option<AdminMetadataManager>)None$.MODULE$);
            return BoxedUnit.UNIT;
        });
        return var3_3;
    }

    @Override
    public void onMetadataRequestUpdate() {
        this.clusterLinkClient().networkClient().wakeup();
    }

    private void maybeFindCoordinator() {
        Node controller;
        Node node = controller = this.currentMetadataCluster() == null ? null : this.currentMetadataCluster().controller();
        if (this.currentCoordinator() == null && this.coordinatorListeners().nonEmpty() && controller != null && controller.id() >= 0) {
            if (this.clusterLinkClient().networkClient().ready(controller, this.time.milliseconds())) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(67).append("Determine link coordinator using DescribeClusterLinks controllerId=").append(controller).toString());
                DescribeClusterLinksRequest.Builder requestBuilder = new DescribeClusterLinksRequest.Builder(Optional.of(Collections.singleton(this.clusterLinkMetadata().linkName())), false, Predef$.MODULE$.Integer2int(this.clusterLinkConfig.requestTimeoutMs()));
                long now = this.time.milliseconds();
                ClientRequest request = this.clusterLinkClient().networkClient().newClientRequest(controller.idString(), (AbstractRequest.Builder)requestBuilder, now, true, Predef$.MODULE$.Integer2int(this.clusterLinkConfig.requestTimeoutMs()), response -> this.processDescribeClusterLinkResponse(response));
                this.clusterLinkClient().networkClient().send(request, now);
                return;
            }
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Controller connection not yet ready to send DescribeClusterLinks request");
        }
    }

    private void processDescribeClusterLinkResponse(ClientResponse response) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(38).append("Process DescribeClusterLinks response ").append(response).toString());
        if (response.wasDisconnected()) {
            this.clusterLinkMetadata().requestUpdate();
            return;
        }
        if (response.versionMismatch() != null || response.requestHeader().apiVersion() < 1) {
            this.useControllerAsLinkCoordinator();
            return;
        }
        DescribeClusterLinksResponseData describeResponse = ((DescribeClusterLinksResponse)response.responseBody()).data();
        if (describeResponse.errorCode() == Errors.NONE.code() && !describeResponse.entries().isEmpty()) {
            DescribeClusterLinksResponseData.EntryData entry = (DescribeClusterLinksResponseData.EntryData)describeResponse.entries().get(0);
            if (entry.linkCoordinatorId() != -1) {
                Node coordinator = new Node(entry.linkCoordinatorId(), entry.linkCoordinatorHost(), entry.linkCoordinatorPort());
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(27).append("Remote link coordinator is ").append(coordinator).toString());
                this.currentCoordinator_$eq(coordinator);
                this.coordinatorListeners().foreach((Function1 & Serializable & scala.Serializable)x$5 -> {
                    x$5.onNewRemoteLinkCoordinator(coordinator);
                    return BoxedUnit.UNIT;
                });
                return;
            }
            if (entry.linkState() == ClusterLinkDescription.LinkState.ACTIVE.getValue()) {
                this.useControllerAsLinkCoordinator();
                return;
            }
        } else {
            this.useControllerAsLinkCoordinator();
        }
    }

    private void useControllerAsLinkCoordinator() {
        Node controller;
        Node node = controller = this.currentMetadataCluster() == null ? null : this.currentMetadataCluster().controller();
        if (controller != null && controller.id() != -1) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(61).append("Cluster link coordinator not known, using controller ").append(controller).append(" instead").toString());
            this.currentCoordinator_$eq(controller);
            this.coordinatorListeners().foreach((Function1 & Serializable & scala.Serializable)x$6 -> {
                x$6.onNewRemoteLinkCoordinator(controller);
                return BoxedUnit.UNIT;
            });
            return;
        }
        this.clusterLinkMetadata().requestUpdate();
    }

    public ClusterLinkMetadataThread(KafkaConfig brokerConfig, ClusterLinkConfig clusterLinkConfig, Option<ClusterLinkConnectionManager> connectionManager, ClusterLinkMetadata clusterLinkMetadata, Metrics metrics, String clientType, Time time) {
        this.brokerConfig = brokerConfig;
        this.clusterLinkConfig = clusterLinkConfig;
        this.connectionManager = connectionManager;
        this.clusterLinkMetadata = clusterLinkMetadata;
        this.metrics = metrics;
        this.clientType = clientType;
        this.time = time;
        super(new StringBuilder(13).append("LinkMetadata-").append(clusterLinkMetadata.linkDescription()).toString());
        Logging.$init$(this);
        this.logContext = new LogContext(new StringBuilder(29).append("[ClusterLinkMetadataClient ").append(clusterLinkMetadata.linkDescription()).append("] ").toString(), (AtomicReference)connectionManager.map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.maxLogLevel()).orNull(Predef$.MODULE$.$conforms()));
        this.clusterLinkClient = this.createNetworkClient(clusterLinkConfig, clusterLinkMetadata);
        this.dynamicLogger = new ClusterLinkDynamicLogger(this, ClusterLinkDynamicLogger$.MODULE$.$lessinit$greater$default$2(), ClusterLinkDynamicLogger$.MODULE$.$lessinit$greater$default$3(), ClusterLinkDynamicLogger$.MODULE$.$lessinit$greater$default$4());
        this.metadataListeners = (Buffer)Buffer$.MODULE$.apply((Seq)Nil$.MODULE$);
        this.coordinatorListeners = (Buffer)Buffer$.MODULE$.apply((Seq)Nil$.MODULE$);
        clusterLinkMetadata.setRefreshListener(this);
    }
}

