/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import kafka.server.ControllerInformation;
import kafka.server.ControllerNodeProvider;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.link.BrokerAdminMetadataManager;
import kafka.server.link.ClusterLinkAdminRequestInterceptorFactory;
import kafka.server.link.ClusterLinkClientType;
import kafka.server.link.ClusterLinkClientType$LocalAdmin$;
import kafka.server.link.ClusterLinkClientType$LocalControllerAdmin$;
import kafka.server.link.ClusterLinkForwardingLocalAdmin;
import kafka.server.link.ClusterLinkLocalAdmin;
import kafka.server.link.ClusterLinkLocalAdmin$;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkScheduler;
import kafka.utils.Logging;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.AdminRequestInterceptor;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.List;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;

public final class ClusterLinkLocalAdmin$
implements Logging {
    public static final ClusterLinkLocalAdmin$ MODULE$ = new ClusterLinkLocalAdmin$();
    private static final String MetricsGroupPrefix = "admin-client";
    private static Logger logger;
    private static String logIdent;
    private static volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!bitmap$0) {
                logger = Logging.logger$(this);
                bitmap$0 = true;
            }
        }
        return logger;
    }

    @Override
    public Logger logger() {
        if (!bitmap$0) {
            return this.logger$lzycompute();
        }
        return logger;
    }

    @Override
    public String logIdent() {
        return logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        logIdent = x$1;
    }

    public String MetricsGroupPrefix() {
        return MetricsGroupPrefix;
    }

    public ClusterLinkLocalAdmin createLocalAdmin(KafkaConfig brokerConfig, int adminIndex, AuthorizerServerInfo serverInfo, ClusterLinkMetadataManager linkMetadataManager, ClusterLinkManager clusterLinkManager, GroupCoordinator groupCoordinator, ClusterLinkScheduler scheduler, Option<Authorizer> authorizer, Metrics metrics, Time time, ListenerName interBrokerListenerName, Option<ControllerNodeProvider> kraftControllerNodeProvider) {
        KafkaAdminClient kafkaAdminClient;
        if (!Predef$.MODULE$.Boolean2boolean(brokerConfig.clusterLinkEnableLocalAdmin())) {
            return this.createForwardingLocalAdmin(brokerConfig, serverInfo, authorizer);
        }
        Map adminConfigs = ConfluentConfigs.clientConfigsForEndpoint((AbstractConfig)brokerConfig, (Endpoint)serverInfo.interBrokerEndpoint());
        adminConfigs.remove("metric.reporters");
        adminConfigs.put("enable.metrics.push", "false");
        KafkaAdminClient localAdmin = this.createInterBrokerAdmin((Option<String>)None$.MODULE$, brokerConfig, adminIndex, serverInfo, linkMetadataManager, clusterLinkManager, groupCoordinator, scheduler, metrics, time, interBrokerListenerName);
        if (kraftControllerNodeProvider instanceof Some) {
            ControllerNodeProvider provider = (ControllerNodeProvider)((Some)kraftControllerNodeProvider).value();
            kafkaAdminClient = this.createControllerAdmin(brokerConfig, adminIndex, serverInfo, linkMetadataManager, clusterLinkManager, groupCoordinator, scheduler, metrics, time, interBrokerListenerName, provider);
        } else if (None$.MODULE$.equals(kraftControllerNodeProvider)) {
            kafkaAdminClient = localAdmin;
        } else {
            throw new MatchError(kraftControllerNodeProvider);
        }
        KafkaAdminClient controllerAdmin = kafkaAdminClient;
        return new ClusterLinkLocalAdmin(brokerConfig, (Option<ClusterLinkMetadataManager>)new Some((Object)linkMetadataManager), authorizer, (ConfluentAdmin)localAdmin, (ConfluentAdmin)controllerAdmin);
    }

    public ClusterLinkLocalAdmin createForwardingLocalAdmin(KafkaConfig brokerConfig, AuthorizerServerInfo serverInfo, Option<Authorizer> authorizer) {
        None$ linkName = None$.MODULE$;
        Map adminConfigs = ConfluentConfigs.clientConfigsForEndpoint((AbstractConfig)brokerConfig, (Endpoint)serverInfo.interBrokerEndpoint());
        adminConfigs.remove("metric.reporters");
        adminConfigs.put("enable.metrics.push", "false");
        String clientId = new StringBuilder(15).append("cluster-link-").append(linkName.getOrElse((Function0 & Serializable)() -> "")).append("-").append(ClusterLinkClientType$LocalAdmin$.MODULE$.name()).append("-").append(brokerConfig.brokerId()).toString();
        adminConfigs.put("client.id", clientId);
        ConfluentAdmin localAdmin = (ConfluentAdmin)Admin.create((Map)adminConfigs);
        return new ClusterLinkForwardingLocalAdmin(brokerConfig, authorizer, localAdmin);
    }

    private String adminClientId(Option<String> linkName, KafkaConfig brokerConfig, int adminIndex, ClusterLinkClientType clientType) {
        String clientIndex = adminIndex >= 0 && Predef$.MODULE$.Integer2int(brokerConfig.clusterLinkNumBackgroundThreads()) > 1 ? new StringBuilder(1).append("-").append(adminIndex).toString() : "";
        return new StringBuilder(15).append("cluster-link-").append(linkName.getOrElse((Function0 & Serializable)() -> "")).append("-").append(clientType.name()).append(clientIndex).append("-").append(brokerConfig.brokerId()).toString();
    }

    public KafkaAdminClient createInterBrokerAdmin(Option<String> linkName, KafkaConfig brokerConfig, int adminIndex, AuthorizerServerInfo serverInfo, ClusterLinkMetadataManager linkMetadataManager, ClusterLinkManager clusterLinkManager, GroupCoordinator groupCoordinator, ClusterLinkScheduler scheduler, Metrics metrics, Time time, ListenerName interBrokerListenerName) {
        String clientId = this.adminClientId(linkName, brokerConfig, adminIndex, ClusterLinkClientType$LocalAdmin$.MODULE$);
        Map adminConfigs = ConfluentConfigs.clientConfigsForEndpoint((AbstractConfig)brokerConfig, (Endpoint)serverInfo.interBrokerEndpoint());
        adminConfigs.remove("metric.reporters");
        adminConfigs.put("client.id", clientId);
        ListenerName listenerName = new ListenerName((String)serverInfo.interBrokerEndpoint().listenerName().get());
        LogContext logContext = new LogContext(new StringBuilder(24).append("[AdminClient clientId=").append(clientId).append("] ").toString());
        AdminClientConfig adminConfig = new AdminClientConfig(adminConfigs);
        MetadataCache metadataCache = linkMetadataManager.metadataCache();
        BrokerAdminMetadataManager adminMetadataManager = new BrokerAdminMetadataManager(logContext, adminConfig, metadataCache, listenerName){
            private final MetadataCache metadataCache$1;
            private final ListenerName listenerName$1;

            public Node controller() {
                return (Node)this.metadataCache$1.getControllerId().map((Function1 & Serializable)controller -> this.nodeById(controller.id())).orNull((.less.colon.less)$less$colon$less$.MODULE$.refl());
            }

            public Node nodeById(int nodeId) {
                return (Node)this.metadataCache$1.getAliveBrokerNode(nodeId, this.listenerName$1).orNull((.less.colon.less)$less$colon$less$.MODULE$.refl());
            }
            {
                this.metadataCache$1 = metadataCache$1;
                this.listenerName$1 = listenerName$1;
                super(logContext$1, adminConfig$1, (Function0<List<Node>>)new Serializable(metadataCache$1, listenerName$1){
                    private static final long serialVersionUID = 0L;
                    private final MetadataCache metadataCache$1;
                    private final ListenerName listenerName$1;

                    public final List<Node> apply() {
                        return ClusterLinkLocalAdmin$.kafka$server$link$ClusterLinkLocalAdmin$$aliveNodes$1(this.metadataCache$1, this.listenerName$1);
                    }
                    {
                        this.metadataCache$1 = metadataCache$1;
                        this.listenerName$1 = listenerName$1;
                    }
                });
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$controller$1(kafka.server.link.ClusterLinkLocalAdmin$$anon$2 kafka.server.CachedControllerId )}, serializedLambda);
            }
        };
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder((AbstractConfig)adminConfig, (Time)time, (LogContext)logContext, null, null);
        return this.createAdmin(this.MetricsGroupPrefix(), adminConfig, adminMetadataManager, linkMetadataManager, clusterLinkManager, groupCoordinator, scheduler, channelBuilder, metrics, time, logContext, interBrokerListenerName, false);
    }

    /*
     * WARNING - void declaration
     */
    private ConfluentAdmin createControllerAdmin(KafkaConfig brokerConfig, int adminIndex, AuthorizerServerInfo serverInfo, ClusterLinkMetadataManager linkMetadataManager, ClusterLinkManager clusterLinkManager, GroupCoordinator groupCoordinator, ClusterLinkScheduler scheduler, Metrics metrics, Time time, ListenerName interBrokerListenerName, ControllerNodeProvider kraftControllerNodeProvider) {
        void clientChannelBuilder_saslHandshakeRequestEnable;
        void clientChannelBuilder_clientSaslMechanism;
        void clientChannelBuilder_listenerName;
        void clientChannelBuilder_contextType;
        String clientId = this.adminClientId((Option<String>)None$.MODULE$, brokerConfig, adminIndex, ClusterLinkClientType$LocalControllerAdmin$.MODULE$);
        Map adminConfigs = ConfluentConfigs.clientConfigsForEndpoint((AbstractConfig)brokerConfig, (Endpoint)serverInfo.interBrokerEndpoint());
        adminConfigs.remove("metric.reporters");
        adminConfigs.put("enable.metrics.push", "false");
        adminConfigs.put("client.id", clientId);
        LogContext logContext = new LogContext(new StringBuilder(24).append("[AdminClient clientId=").append(clientId).append("] ").toString());
        AdminClientConfig adminConfig = new AdminClientConfig(adminConfigs);
        BrokerAdminMetadataManager adminMetadataManager = new BrokerAdminMetadataManager(logContext, adminConfig, kraftControllerNodeProvider){
            private final ControllerNodeProvider kraftControllerNodeProvider$1;

            public Node controller() {
                return (Node)this.kraftControllerNodeProvider$1.getControllerInfo().node().orNull((.less.colon.less)$less$colon$less$.MODULE$.refl());
            }

            public Node nodeById(int nodeId) {
                return (Node)this.kraftControllerNodeProvider$1.getControllerInfo().node().filter((Function1 & Serializable)x$66 -> BoxesRunTime.boxToBoolean((boolean)anon.3.$anonfun$nodeById$1(nodeId, x$66))).orNull((.less.colon.less)$less$colon$less$.MODULE$.refl());
            }

            public static final /* synthetic */ boolean $anonfun$nodeById$1(int nodeId$1, Node x$66) {
                return x$66.id() == nodeId$1;
            }
            {
                this.kraftControllerNodeProvider$1 = kraftControllerNodeProvider$1;
                super(logContext$2, adminConfig$2, (Function0<List<Node>>)new Serializable(kraftControllerNodeProvider$1){
                    private static final long serialVersionUID = 0L;
                    private final ControllerNodeProvider kraftControllerNodeProvider$1;

                    public final List<Node> apply() {
                        return ClusterLinkLocalAdmin$.kafka$server$link$ClusterLinkLocalAdmin$$aliveNodes$2(this.kraftControllerNodeProvider$1);
                    }
                    {
                        this.kraftControllerNodeProvider$1 = kraftControllerNodeProvider$1;
                    }
                });
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$nodeById$1$adapted(int org.apache.kafka.common.Node )}, serializedLambda);
            }
        };
        ControllerInformation controllerInfo = kraftControllerNodeProvider.getControllerInfo();
        boolean bl = brokerConfig.saslInterBrokerHandshakeRequestEnable();
        String string = controllerInfo.saslMechanism();
        ListenerName listenerName = controllerInfo.listenerName();
        JaasContext.Type type = JaasContext.Type.SERVER;
        SecurityProtocol clientChannelBuilder_securityProtocol = controllerInfo.securityProtocol();
        Object var19_22 = null;
        type = null;
        listenerName = null;
        string = null;
        ChannelBuilder channelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)clientChannelBuilder_securityProtocol, (JaasContext.Type)clientChannelBuilder_contextType, (AbstractConfig)brokerConfig, (ListenerName)clientChannelBuilder_listenerName, (String)clientChannelBuilder_clientSaslMechanism, (Time)time, (boolean)clientChannelBuilder_saslHandshakeRequestEnable, (LogContext)logContext, null, null);
        return this.createAdmin(this.MetricsGroupPrefix(), adminConfig, adminMetadataManager, linkMetadataManager, clusterLinkManager, groupCoordinator, scheduler, channelBuilder, metrics, time, logContext, interBrokerListenerName, true);
    }

    private KafkaAdminClient createAdmin(String metricsGroupPrefix, AdminClientConfig adminConfig, BrokerAdminMetadataManager adminMetadataManager, ClusterLinkMetadataManager linkMetadataManager, ClusterLinkManager clusterLinkManager, GroupCoordinator groupCoordinator, ClusterLinkScheduler scheduler, ChannelBuilder channelBuilder, Metrics metrics, Time time, LogContext logContext, ListenerName interBrokerListenerName, boolean isKraftControllerAdmin) {
        String clientId = adminConfig.getString("client.id");
        Selector selector = null;
        NetworkClient networkClient = null;
        try {
            selector = new Selector(-1, -1L, metrics, time, metricsGroupPrefix, Collections.singletonMap("client-id", clientId), false, channelBuilder, logContext);
            ApiVersions apiVersions = new ApiVersions();
            networkClient = new NetworkClient((Selectable)selector, (MetadataUpdater)adminMetadataManager.updater(), clientId, 1, Predef$.MODULE$.Long2long(adminConfig.getLong("reconnect.backoff.ms")), Predef$.MODULE$.Long2long(adminConfig.getLong("reconnect.backoff.max.ms")), Predef$.MODULE$.Integer2int(adminConfig.getInt("send.buffer.bytes")), Predef$.MODULE$.Integer2int(adminConfig.getInt("receive.buffer.bytes")), (int)TimeUnit.HOURS.toMillis(1L), Predef$.MODULE$.Long2long(adminConfig.getLong("socket.connection.setup.timeout.ms")), Predef$.MODULE$.Long2long(adminConfig.getLong("socket.connection.setup.timeout.max.ms")), time, true, apiVersions, logContext, MetadataRecoveryStrategy.NONE);
            ClusterLinkAdminRequestInterceptorFactory interceptorFactory = new ClusterLinkAdminRequestInterceptorFactory(networkClient, linkMetadataManager, clusterLinkManager, groupCoordinator, scheduler, adminConfig, logContext, apiVersions, interBrokerListenerName, isKraftControllerAdmin);
            return (KafkaAdminClient)ConfluentAdminUtils.createConfluentAdmin((AdminClientConfig)adminConfig, (AdminMetadataManager)adminMetadataManager, (KafkaClient)networkClient, (Time)time, (AdminRequestInterceptor.Factory)interceptorFactory);
        }
        catch (Throwable e) {
            if (this.logger().underlying().isWarnEnabled()) {
                String msgWithLogIdent_msg = "Failed to create cluster link admin client";
                Object var20_20 = null;
                this.logger().underlying().warn(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg), e);
            }
            Utils.closeQuietly(networkClient, (String)"NetworkClient");
            Utils.closeQuietly(selector, (String)"Selector");
            Utils.closeQuietly((AutoCloseable)channelBuilder, (String)"ChannelBuilder");
            throw e;
        }
    }

    public static final List kafka$server$link$ClusterLinkLocalAdmin$$aliveNodes$1(MetadataCache metadataCache$1, ListenerName listenerName$1) {
        return metadataCache$1.getAliveBrokerNodes(listenerName$1).toList();
    }

    public static final List kafka$server$link$ClusterLinkLocalAdmin$$aliveNodes$2(ControllerNodeProvider kraftControllerNodeProvider$1) {
        return kraftControllerNodeProvider$1.getControllerInfo().node().toList();
    }

    public static final /* synthetic */ String $anonfun$createAdmin$1() {
        return "Failed to create cluster link admin client";
    }

    public static final /* synthetic */ Throwable $anonfun$createAdmin$2(Throwable e$1) {
        return e$1;
    }

    private ClusterLinkLocalAdmin$() {
    }
}

