/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import kafka.cluster.BrokerEndPoint;
import kafka.server.BrokerBlockingSender;
import kafka.server.FailedPartitions;
import kafka.server.KafkaConfig;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConnectionManager;
import kafka.server.link.ClusterLinkFactory$;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.ClusterLinkFetcherThread;
import kafka.server.link.ClusterLinkLeaderEndPoint;
import kafka.server.link.ClusterLinkLeaderEndPoint$;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.FetchResponseSize;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;

public final class ClusterLinkFetcherThread$ {
    public static ClusterLinkFetcherThread$ MODULE$;
    private final String mirrorTopicMessageRateMetricName;
    private final String mirrorTopicMessageRateMetricDescription;
    private final String mirrorTopicRpoMetricName;
    private final String mirrorTopicRpoMetricDescription;

    static {
        new ClusterLinkFetcherThread$();
    }

    public Option<LogContext> $lessinit$greater$default$16() {
        return None$.MODULE$;
    }

    public String mirrorTopicMessageRateMetricName() {
        return this.mirrorTopicMessageRateMetricName;
    }

    public String mirrorTopicMessageRateMetricDescription() {
        return this.mirrorTopicMessageRateMetricDescription;
    }

    public String mirrorTopicRpoMetricName() {
        return this.mirrorTopicRpoMetricName;
    }

    public String mirrorTopicRpoMetricDescription() {
        return this.mirrorTopicRpoMetricDescription;
    }

    public ClusterLinkFetcherThread apply(String name, int fetcherId, KafkaConfig brokerConfig, ClusterLinkConfig clusterLinkConfig, ClusterLinkMetadata clusterLinkMetadata, ClusterLinkFetcherManager fetcherManager, ClusterLinkConnectionManager connectionManager, BrokerEndPoint sourceBroker, FailedPartitions failedPartitions, ReplicaManager replicaMgr, ReplicaQuota quota, ClusterLinkMetrics clusterLinkMetrics, Time time, Option<String> tenant, Function0<FetchResponseSize> dynamicFetchSize) {
        int brokerId = brokerConfig.brokerId();
        LogContext logContext = new LogContext(new StringBuilder(68).append("[ClusterLinkFetcher brokerId=").append(brokerId).append(" ").append("fetcherId=").append(fetcherId).append("] source(link=").append(clusterLinkMetadata.linkName()).append(", leaderId=").append(sourceBroker.id()).append(")] ").toString(), connectionManager.maxLogLevel());
        ClusterLinkNetworkClient clusterLinkClient = new ClusterLinkNetworkClient(brokerConfig, clusterLinkConfig, ClusterLinkMetrics$.MODULE$.throttleTimeSensorName(clusterLinkMetadata.linkName()), (Option<ClusterLinkMetadata>)None$.MODULE$, (Option<MetadataUpdater>)new Some((Object)new ManualMetadataUpdater()), clusterLinkMetrics.metrics(), (Map<String, String>)ClusterLinkFactory$.MODULE$.linkMetricTags(clusterLinkMetadata.linkName()).$plus$plus((GenTraversableOnce)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"broker-id"), (Object)Integer.toString(sourceBroker.id())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"fetcher-id"), (Object)Integer.toString(fetcherId))}))), time, new StringBuilder(22).append("link-").append(clusterLinkMetadata.linkName()).append("-broker-").append(brokerId).append("-fetcher-").append(fetcherId).toString(), "fetcher", logContext);
        connectionManager.enableClusterLink(clusterLinkClient, (Option<AdminMetadataManager>)None$.MODULE$);
        BrokerBlockingSender blockingSender = new BrokerBlockingSender(sourceBroker, brokerConfig, Predef$.MODULE$.Integer2int(clusterLinkConfig.replicaSocketTimeoutMs()), time, fetcherId, clusterLinkClient.networkClient(), (Option<Reconfigurable>)None$.MODULE$);
        ClusterLinkLeaderEndPoint leader = ClusterLinkLeaderEndPoint$.MODULE$.apply(logContext, time, blockingSender, brokerConfig, clusterLinkConfig, replicaMgr, quota, clusterLinkMetrics);
        ExponentialBackoff exponentialBackoff = new ExponentialBackoff(clusterLinkConfig.replicaFetchBackoffMs().longValue(), 2, clusterLinkConfig.replicaFetchBackoffMaxMs().longValue(), 0.0);
        return new ClusterLinkFetcherThread(name, fetcherId, leader, brokerConfig, clusterLinkConfig, clusterLinkMetadata, fetcherManager, failedPartitions, exponentialBackoff, replicaMgr, quota, clusterLinkMetrics, time, dynamicFetchSize, clusterLinkClient, (Option<LogContext>)new Some((Object)logContext), tenant);
    }

    private ClusterLinkFetcherThread$() {
        MODULE$ = this;
        this.mirrorTopicMessageRateMetricName = "mirror-topic-message-rate-per-second";
        this.mirrorTopicMessageRateMetricDescription = "Rate of messages being written to the mirror topic per second";
        this.mirrorTopicRpoMetricName = "mirror-topic-rpo-seconds";
        this.mirrorTopicRpoMetricDescription = "RPO for mirror topic in seconds";
    }
}

