package kafka.server.link;

import com.typesafe.scalalogging.Logger;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.Log4jControllerRegistration$;
import kafka.utils.Logging;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterLinkPausedException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.slf4j.event.Level;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.immutable.C$colon$colon;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: ClusterLinkConnectionManager.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Mg!B\u0014)\u0003\u0003y\u0003\u0002\u0003#\u0001\u0005\u000b\u0007I\u0011A#\t\u00111\u0003!\u0011!Q\u0001\n\u0019C\u0001\"\u0014\u0001\u0003\u0002\u0003\u0006IA\u0014\u0005\t#\u0002\u0011)\u0019!C\u0001%\"Aa\f\u0001B\u0001B\u0003%1\u000b\u0003\u0005`\u0001\t\u0005\t\u0015!\u0003a\u0011!\u0019\u0007A!A!\u0002\u0013!\u0007\u0002C4\u0001\u0005\u0003\u0005\u000b\u0011\u00025\t\u000b1\u0004A\u0011A7\t\u000fU\u0004!\u0019!C\tm\"9\u0011Q\u0001\u0001!\u0002\u00139\b\"CA\u0004\u0001\t\u0007I\u0011CA\u0005\u0011!\tY\u0002\u0001Q\u0001\n\u0005-\u0001\"CA\u000f\u0001\u0001\u0007I\u0011BA\u0010\u0011%\t\t\u0003\u0001a\u0001\n\u0013\t\u0019\u0003C\u0004\u00020\u0001\u0001\u000b\u0015\u0002(\t\u0013\u0005e\u0002\u00011A\u0005\u0012\u0005m\u0002\"CA\"\u0001\u0001\u0007I\u0011CA#\u0011!\tI\u0005\u0001Q!\n\u0005u\u0002\"CA'\u0001\u0001\u0007I\u0011CA\u001e\u0011%\ty\u0005\u0001a\u0001\n#\t\t\u0006\u0003\u0005\u0002V\u0001\u0001\u000b\u0015BA\u001f\u0011%\tI\u0006\u0001b\u0001\n\u0003\tY\u0006\u0003\u0005\u0002\u0002\u0002\u0001\u000b\u0011BA/\u0011\u001d\t\u0019\t\u0001C!\u0003\u000bC\u0001\"a\"\u0001\t\u0003B\u0013\u0011\u0012\u0005\b\u0003?\u0003A\u0011IAQ\u0011\u001d\t9\u000b\u0001C!\u0003\u000bCq!!+\u0001\t\u0003\nY\u0004C\u0004\u0002,\u0002!\t%a\b\t\u000f\u00055\u0006\u0001\"\u0001\u00020\"9\u0011q\u0017\u0001\u0005\u0012\u0005\u0015\u0005bBA]\u0001\u0011E\u0011Q\u0011\u0005\b\u0003w\u0003a\u0011CAC\u0011\u001d\ti\f\u0001D\t\u0003\u000bCq!a0\u0001\t#\t)\tC\u0004\u0002B\u0002!\t%a\u000f\t\u000f\u0005\r\u0007\u0001\"\u0011\u0002F\na2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014(BA\u0015+\u0003\u0011a\u0017N\\6\u000b\u0005-b\u0013AB:feZ,'OC\u0001.\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019B\u0001\u0001\u00197}A\u0011\u0011\u0007N\u0007\u0002e)\t1'A\u0003tG\u0006d\u0017-\u0003\u00026e\t1\u0011I\\=SK\u001a\u0004\"aN\u001e\u000f\u0005aJT\"\u0001\u0015\n\u0005iB\u0013AE\"mkN$XM\u001d'j].4\u0015m\u0019;pefL!\u0001P\u001f\u0003#\r{gN\\3di&|g.T1oC\u001e,'O\u0003\u0002;QA\u0011qHQ\u0007\u0002\u0001*\u0011\u0011\tL\u0001\u0006kRLGn]\u0005\u0003\u0007\u0002\u0013q\u0001T8hO&tw-\u0001\u0005mS:\\G)\u0019;b+\u00051\u0005CA$K\u001b\u0005A%BA%-\u0003\tQ8.\u0003\u0002L\u0011\ny1\t\\;ti\u0016\u0014H*\u001b8l\t\u0006$\u0018-A\u0005mS:\\G)\u0019;bA\u0005i\u0011N\\5uS\u0006d7i\u001c8gS\u001e\u0004\"\u0001O(\n\u0005AC#!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006\u0019Bn\\2bY2{w-[2bY\u000ecWo\u001d;feV\t1\u000b\u0005\u0002U7:\u0011Q+\u0017\t\u0003-Jj\u0011a\u0016\u0006\u00031:\na\u0001\u0010:p_Rt\u0014B\u0001.3\u0003\u0019\u0001&/\u001a3fM&\u0011A,\u0018\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005i\u0013\u0014\u0001\u00067pG\u0006dGj\\4jG\u0006d7\t\\;ti\u0016\u0014\b%A\bnKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s!\tA\u0014-\u0003\u0002cQ\tQ2\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uC6\u000bg.Y4fe\u00069Q.\u001a;sS\u000e\u001c\bC\u0001\u001df\u0013\t1\u0007F\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018\u0001\u00042s_.,'oQ8oM&<\u0007CA5k\u001b\u0005Q\u0013BA6+\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\rqJg.\u001b;?)\u001dqw\u000e]9sgR\u0004\"\u0001\u000f\u0001\t\u000b\u0011K\u0001\u0019\u0001$\t\u000b5K\u0001\u0019\u0001(\t\u000bEK\u0001\u0019A*\t\u000b}K\u0001\u0019\u00011\t\u000b\rL\u0001\u0019\u00013\t\u000b\u001dL\u0001\u0019\u00015\u0002\r1Lgn[%e+\u00059\bc\u0001=\u0002\u00025\t\u0011P\u0003\u0002{w\u000611m\\7n_:T!!\f?\u000b\u0005ut\u0018AB1qC\u000eDWMC\u0001��\u0003\ry'oZ\u0005\u0004\u0003\u0007I(\u0001B+vS\u0012\fq\u0001\\5oW&#\u0007%A\bti\u0006$Xm\u00115b]\u001e,Gj\\2l+\t\tY\u0001\u0005\u0003\u0002\u000e\u0005]QBAA\b\u0015\u0011\t\t\"a\u0005\u0002\t1\fgn\u001a\u0006\u0003\u0003+\tAA[1wC&!\u0011\u0011DA\b\u0005\u0019y%M[3di\u0006\u00012\u000f^1uK\u000eC\u0017M\\4f\u0019>\u001c7\u000eI\u0001\u0012G2,8\u000f^3s\u0019&t7nQ8oM&<W#\u0001(\u0002+\rdWo\u001d;fe2Kgn[\"p]\u001aLwm\u0018\u0013fcR!\u0011QEA\u0016!\r\t\u0014qE\u0005\u0004\u0003S\u0011$\u0001B+oSRD\u0001\"!\f\u0010\u0003\u0003\u0005\rAT\u0001\u0004q\u0012\n\u0014AE2mkN$XM\u001d'j].\u001cuN\u001c4jO\u0002B3\u0001EA\u001a!\r\t\u0014QG\u0005\u0004\u0003o\u0011$\u0001\u0003<pY\u0006$\u0018\u000e\\3\u0002/M|WO]2f\u0007>tg.Z2uS>tWI\\1cY\u0016$WCAA\u001f!\r\t\u0014qH\u0005\u0004\u0003\u0003\u0012$a\u0002\"p_2,\u0017M\\\u0001\u001cg>,(oY3D_:tWm\u0019;j_:,e.\u00192mK\u0012|F%Z9\u0015\t\u0005\u0015\u0012q\t\u0005\n\u0003[\u0011\u0012\u0011!a\u0001\u0003{\t\u0001d]8ve\u000e,7i\u001c8oK\u000e$\u0018n\u001c8F]\u0006\u0014G.\u001a3!Q\r\u0019\u00121G\u0001\tSN\f5\r^5wK\u0006a\u0011n]!di&4Xm\u0018\u0013fcR!\u0011QEA*\u0011%\ti#FA\u0001\u0002\u0004\ti$A\u0005jg\u0006\u001bG/\u001b<fA!\u001aa#a\r\u0002\u00175\f\u0007\u0010T8h\u0019\u00164X\r\\\u000b\u0003\u0003;\u0002b!a\u0018\u0002n\u0005ETBAA1\u0015\u0011\t\u0019'!\u001a\u0002\r\u0005$x.\\5d\u0015\u0011\t9'!\u001b\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0003\u0002l\u0005M\u0011\u0001B;uS2LA!a\u001c\u0002b\ty\u0011\t^8nS\u000e\u0014VMZ3sK:\u001cW\r\u0005\u0003\u0002t\u0005uTBAA;\u0015\u0011\t9(!\u001f\u0002\u000b\u00154XM\u001c;\u000b\u0007\u0005md0A\u0003tY\u001a$$.\u0003\u0003\u0002��\u0005U$!\u0002'fm\u0016d\u0017\u0001D7bq2{w\rT3wK2\u0004\u0013aB:uCJ$X\u000f\u001d\u000b\u0003\u0003K\t1B]3d_:4\u0017nZ;sKR1\u0011QEAF\u0003\u001fCa!!$\u001b\u0001\u0004q\u0015!\u00038fo\u000e{gNZ5h\u0011\u001d\t\tJ\u0007a\u0001\u0003'\u000b1\"\u001e9eCR,GmS3zgB)\u0011QSAN'6\u0011\u0011q\u0013\u0006\u0004\u00033\u0013\u0014AC2pY2,7\r^5p]&!\u0011QTAL\u0005\r\u0019V\r^\u0001\u0015_:\fe/Y5mC\nLG.\u001b;z\u0007\"\fgnZ3\u0015\t\u0005\u0015\u00121\u0015\u0005\b\u0003K[\u0002\u0019AA\u001f\u0003-I7/\u0011<bS2\f'\r\\3\u0002\u0011MDW\u000f\u001e3po:\fa!Y2uSZ,\u0017!D2veJ,g\u000e^\"p]\u001aLw-\u0001\u0007mS:\\7\t\\;ti\u0016\u00148/\u0006\u0002\u00022B)\u0011QSAZ'&!\u0011QWAL\u0005\r\u0019V-]\u0001 K:\u001cXO]3SKZ,'o]3D_:tWm\u0019;j_:\u001cXI\\1cY\u0016$\u0017a\u0007:fg\u0016$(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8BI6Lg.\u0001\u000fde\u0016\fG/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]\u0006#W.\u001b8\u00027\rdwn]3SKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o\u0003U)\b\u000fZ1uK\u0006\u001bG/\u001b<f\u0019&t7nQ8v]R\f\u0011#[:MS:\\7i\\8sI&t\u0017\r^8s\u0003Ea\u0017N\\6D_>\u0014H-\u001b8bi>\u0014\u0018\nZ\u000b\u0003\u0003\u000f\u0004R!MAe\u0003\u001bL1!a33\u0005\u0019y\u0005\u000f^5p]B\u0019\u0011'a4\n\u0007\u0005E'GA\u0002J]R\u0004")
/* loaded from: input_file:kafka/server/link/ClusterLinkConnectionManager.class */
public abstract class ClusterLinkConnectionManager implements ClusterLinkFactory.ConnectionManager, Logging {
    private final ClusterLinkData linkData;
    private final String localLogicalCluster;
    private final ClusterLinkMetadataManager metadataManager;
    private final ClusterLinkMetrics metrics;
    private final KafkaConfig brokerConfig;
    private final Uuid linkId;
    private final Object stateChangeLock;
    private volatile ClusterLinkConfig clusterLinkConfig;
    private volatile boolean sourceConnectionEnabled;
    private volatile boolean isActive;
    private final AtomicReference<Level> maxLogLevel;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override // kafka.utils.Logging
    public String loggerName() {
        String loggerName;
        loggerName = loggerName();
        return loggerName;
    }

    @Override // kafka.utils.Logging
    public String msgWithLogIdent(String str) {
        String msgWithLogIdent;
        msgWithLogIdent = msgWithLogIdent(str);
        return msgWithLogIdent;
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0) {
        trace(function0);
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // kafka.utils.Logging
    public boolean isDebugEnabled() {
        boolean isDebugEnabled;
        isDebugEnabled = isDebugEnabled();
        return isDebugEnabled;
    }

    @Override // kafka.utils.Logging
    public boolean isTraceEnabled() {
        boolean isTraceEnabled;
        isTraceEnabled = isTraceEnabled();
        return isTraceEnabled;
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0) {
        debug(function0);
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0) {
        info(function0);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0) {
        warn(function0);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0) {
        error(function0);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0) {
        fatal(function0);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0, Function0<Throwable> function02) {
        fatal(function0, function02);
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onControllerChange(boolean z) {
        onControllerChange(z);
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onLinkMetadataPartitionLeaderChange() {
        onLinkMetadataPartitionLeaderChange();
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int persistentConnectionCount() {
        int persistentConnectionCount;
        persistentConnectionCount = persistentConnectionCount();
        return persistentConnectionCount;
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public int reverseConnectionCount() {
        int reverseConnectionCount;
        reverseConnectionCount = reverseConnectionCount();
        return reverseConnectionCount;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [kafka.server.link.ClusterLinkConnectionManager] */
    private Logger logger$lzycompute() {
        Logger logger;
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                logger = logger();
                this.logger = logger;
                r0 = this;
                r0.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    @Override // kafka.utils.Logging
    public Logger logger() {
        return !this.bitmap$0 ? logger$lzycompute() : this.logger;
    }

    @Override // kafka.utils.Logging
    public String logIdent() {
        return this.logIdent;
    }

    @Override // kafka.utils.Logging
    public void logIdent_$eq(String str) {
        this.logIdent = str;
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public ClusterLinkData linkData() {
        return this.linkData;
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public String localLogicalCluster() {
        return this.localLogicalCluster;
    }

    public Uuid linkId() {
        return this.linkId;
    }

    public Object stateChangeLock() {
        return this.stateChangeLock;
    }

    private ClusterLinkConfig clusterLinkConfig() {
        return this.clusterLinkConfig;
    }

    private void clusterLinkConfig_$eq(ClusterLinkConfig clusterLinkConfig) {
        this.clusterLinkConfig = clusterLinkConfig;
    }

    public boolean sourceConnectionEnabled() {
        return this.sourceConnectionEnabled;
    }

    public void sourceConnectionEnabled_$eq(boolean z) {
        this.sourceConnectionEnabled = z;
    }

    public boolean isActive() {
        return this.isActive;
    }

    public void isActive_$eq(boolean z) {
        this.isActive = z;
    }

    public AtomicReference<Level> maxLogLevel() {
        return this.maxLogLevel;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void startup() {
        ?? stateChangeLock = stateChangeLock();
        synchronized (stateChangeLock) {
            info(() -> {
                return "Cluster link connection manager has started up.";
            });
            if (Predef$.MODULE$.Boolean2boolean(clusterLinkConfig().clusterLinkPaused())) {
                isActive_$eq(false);
            } else {
                isActive_$eq(true);
                updateActiveLinkCount();
            }
            resetReverseConnectionAdmin();
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void reconfigure(ClusterLinkConfig clusterLinkConfig, Set<String> set) {
        ?? stateChangeLock = stateChangeLock();
        synchronized (stateChangeLock) {
            debug(() -> {
                return new StringBuilder(74).append("Reconfiguring link connection manager with new configs updated=").append(set).append(" newConfig=").append(clusterLinkConfig.values()).toString();
            });
            clusterLinkConfig_$eq(clusterLinkConfig);
            if (Predef$.MODULE$.Boolean2boolean(clusterLinkConfig.clusterLinkPaused())) {
                if (isActive()) {
                    info(() -> {
                        return "Shutting down cluster link connection manager because link has been paused";
                    });
                }
                shutdown();
            } else {
                isActive_$eq(true);
                resetReverseConnectionAdmin();
            }
            updateActiveLinkCount();
            debug(() -> {
                return "Completed reconfiguration of cluster link";
            });
        }
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void onAvailabilityChange(boolean z) {
        if (z) {
            maxLogLevel().set(null);
        } else {
            maxLogLevel().set(Level.DEBUG);
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public void shutdown() {
        ?? stateChangeLock = stateChangeLock();
        synchronized (stateChangeLock) {
            isActive_$eq(false);
            closeReverseConnectionAdmin();
            updateActiveLinkCount();
            info(() -> {
                return new StringBuilder(64).append("Shutdown of ClusterLinkConnectionManager with cluster link data ").append(this.linkData()).toString();
            });
        }
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public boolean active() {
        return isActive();
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public ClusterLinkConfig currentConfig() {
        return clusterLinkConfig();
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public Seq<String> linkClusters() {
        return (Seq) new C$colon$colon(localLogicalCluster(), Nil$.MODULE$).$plus$plus2(Option$.MODULE$.option2Iterable(linkData().clusterId()).toSeq());
    }

    public void ensureReverseConnectionsEnabled() {
        if (!isActive() || Predef$.MODULE$.Boolean2boolean(clusterLinkConfig().clusterLinkPaused())) {
            throw new ClusterLinkPausedException(new StringBuilder(36).append("Cluster link ").append(linkData().linkName()).append(" is not active,").append(" paused=").append(clusterLinkConfig().clusterLinkPaused()).toString());
        }
        if (!sourceConnectionEnabled()) {
            throw new InvalidRequestException(new StringBuilder(46).append("Cluster link '").append(linkData().linkName()).append("' is not a source initiated link").toString());
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:20:0x0080, code lost:
    
        if (r1.equals(r2) == false) goto L29;
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x0083, code lost:
    
        r1 = true;
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x004e, code lost:
    
        if (r1.equals(r2) == false) goto L18;
     */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v11, types: [kafka.server.link.ClusterLinkConnectionManager] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void resetReverseConnectionAdmin() {
        /*
            r4 = this;
            r0 = r4
            java.lang.Object r0 = r0.stateChangeLock()
            r1 = r0
            r5 = r1
            monitor-enter(r0)
            r0 = r4
            kafka.server.link.ClusterLinkConfig r0 = r0.currentConfig()     // Catch: java.lang.Throwable -> La7
            r6 = r0
            r0 = r4
            r1 = r4
            boolean r1 = r1.isActive()     // Catch: java.lang.Throwable -> La7
            if (r1 == 0) goto L87
            scala.Predef$ r1 = scala.Predef$.MODULE$     // Catch: java.lang.Throwable -> La7
            r2 = r6
            java.lang.Boolean r2 = r2.clusterLinkPaused()     // Catch: java.lang.Throwable -> La7
            boolean r1 = r1.Boolean2boolean(r2)     // Catch: java.lang.Throwable -> La7
            if (r1 != 0) goto L87
            r1 = r6
            kafka.server.link.LinkMode r1 = r1.linkMode()     // Catch: java.lang.Throwable -> La7
            kafka.server.link.LinkMode$Destination$ r2 = kafka.server.link.LinkMode$Destination$.MODULE$     // Catch: java.lang.Throwable -> La7
            r7 = r2
            r2 = r1
            if (r2 != 0) goto L31
        L2e:
            goto L51
        L31:
            r2 = r7
            boolean r1 = r1.equals(r2)     // Catch: java.lang.Throwable -> La7
            if (r1 == 0) goto L51
            r1 = r6
            kafka.server.link.ConnectionMode r1 = r1.connectionMode()     // Catch: java.lang.Throwable -> La7
            kafka.server.link.ConnectionMode$Inbound$ r2 = kafka.server.link.ConnectionMode$Inbound$.MODULE$     // Catch: java.lang.Throwable -> La7
            r8 = r2
            r2 = r1
            if (r2 != 0) goto L49
        L46:
            goto L51
        L49:
            r2 = r8
            boolean r1 = r1.equals(r2)     // Catch: java.lang.Throwable -> La7
            if (r1 != 0) goto L83
        L51:
            r1 = r6
            kafka.server.link.LinkMode r1 = r1.linkMode()     // Catch: java.lang.Throwable -> La7
            kafka.server.link.LinkMode$Source$ r2 = kafka.server.link.LinkMode$Source$.MODULE$     // Catch: java.lang.Throwable -> La7
            r9 = r2
            r2 = r1
            if (r2 != 0) goto L62
        L5f:
            goto L87
        L62:
            r2 = r9
            boolean r1 = r1.equals(r2)     // Catch: java.lang.Throwable -> La7
            if (r1 == 0) goto L87
            r1 = r6
            kafka.server.link.ConnectionMode r1 = r1.connectionMode()     // Catch: java.lang.Throwable -> La7
            kafka.server.link.ConnectionMode$Outbound$ r2 = kafka.server.link.ConnectionMode$Outbound$.MODULE$     // Catch: java.lang.Throwable -> La7
            r10 = r2
            r2 = r1
            if (r2 != 0) goto L7b
        L78:
            goto L87
        L7b:
            r2 = r10
            boolean r1 = r1.equals(r2)     // Catch: java.lang.Throwable -> La7
            if (r1 == 0) goto L87
        L83:
            r1 = 1
            goto L88
        L87:
            r1 = 0
        L88:
            r0.sourceConnectionEnabled_$eq(r1)     // Catch: java.lang.Throwable -> La7
            r0 = r4
            r0.closeReverseConnectionAdmin()     // Catch: java.lang.Throwable -> La7
            r0 = r4
            boolean r0 = r0.sourceConnectionEnabled()     // Catch: java.lang.Throwable -> La7
            if (r0 == 0) goto La4
            r0 = r4
            r1 = r4
            void r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$resetReverseConnectionAdmin$1(r1);
            }     // Catch: java.lang.Throwable -> La7
            r0.debug(r1)     // Catch: java.lang.Throwable -> La7
            r0 = r4
            r0.createReverseConnectionAdmin()     // Catch: java.lang.Throwable -> La7
        La4:
            r0 = r5
            monitor-exit(r0)
            return
        La7:
            r1 = move-exception
            monitor-exit(r1)
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.server.link.ClusterLinkConnectionManager.resetReverseConnectionAdmin():void");
    }

    public abstract void createReverseConnectionAdmin();

    public abstract void closeReverseConnectionAdmin();

    public void updateActiveLinkCount() {
        this.metrics.activeLinkCountSensor().record(isActive() ? 1 : 0);
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public boolean isLinkCoordinator() {
        return this.metadataManager.isLinkCoordinator(linkData().linkName(), this.metadataManager.isLinkCoordinator$default$2());
    }

    @Override // kafka.server.link.ClusterLinkFactory.ConnectionManager
    public Option<Object> linkCoordinatorId() {
        return this.metadataManager.linkCoordinator(linkData().linkName(), this.brokerConfig.interBrokerListenerName()).map(node -> {
            return BoxesRunTime.boxToInteger(node.id());
        });
    }

    public ClusterLinkConnectionManager(ClusterLinkData clusterLinkData, ClusterLinkConfig clusterLinkConfig, String str, ClusterLinkMetadataManager clusterLinkMetadataManager, ClusterLinkMetrics clusterLinkMetrics, KafkaConfig kafkaConfig) {
        this.linkData = clusterLinkData;
        this.localLogicalCluster = str;
        this.metadataManager = clusterLinkMetadataManager;
        this.metrics = clusterLinkMetrics;
        this.brokerConfig = kafkaConfig;
        Log4jControllerRegistration$ log4jControllerRegistration$ = Log4jControllerRegistration$.MODULE$;
        this.linkId = new Uuid(clusterLinkData.linkId().getMostSignificantBits(), clusterLinkData.linkId().getLeastSignificantBits());
        this.stateChangeLock = new Object();
        this.clusterLinkConfig = clusterLinkConfig;
        this.sourceConnectionEnabled = false;
        this.isActive = true;
        this.maxLogLevel = new AtomicReference<>();
    }
}
