/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkUtils$;
import kafka.server.link.LocalClusterLinkAdminClient;
import kafka.server.link.MirrorTopicConfigSyncRules;
import kafka.server.link.MirrorTopicConfigsDelegate;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.collection.Set;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\u0005}a\u0001\u0002\b\u0010\u0001YA\u0001b\b\u0001\u0003\u0002\u0003\u0006I\u0001\t\u0005\tG\u0001\u0011\t\u0011)A\u0005I!Aq\u0005\u0001BC\u0002\u0013\u0005\u0001\u0006\u0003\u0005-\u0001\t\u0005\t\u0015!\u0003*\u0011%i\u0003A!A!\u0002\u0013qC\u0007\u0003\u00057\u0001\t\u0005\t\u0015!\u00038\u0011\u0015Q\u0004\u0001\"\u0001<\u0011\u001d\u0011\u0005\u00011A\u0005\n\rCq\u0001\u0012\u0001A\u0002\u0013%Q\t\u0003\u0004L\u0001\u0001\u0006KA\f\u0005\u0006\u0019\u0002!\t&\u0014\u0005\u0006#\u0002!IA\u0015\u0005\u0006i\u0002!I!\u001e\u0002\u001d\u00072,8\u000f^3s\u0019&t7nU=oGR{\u0007/[2t\u0007>tg-[4t\u0015\t\u0001\u0012#\u0001\u0003mS:\\'B\u0001\n\u0014\u0003\u0019\u0019XM\u001d<fe*\tA#A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u00019\u0002C\u0001\r\u001d\u001d\tI\"$D\u0001\u0010\u0013\tYr\"\u0001\u000bDYV\u001cH/\u001a:MS:\\7k\u00195fIVdWM]\u0005\u0003;y\u0011A\u0002U3sS>$\u0017n\u0019+bg.T!aG\b\u0002\u001b\rd\u0017.\u001a8u\u001b\u0006t\u0017mZ3s!\tI\u0012%\u0003\u0002#\u001f\tA2\t\\;ti\u0016\u0014H*\u001b8l\u00072LWM\u001c;NC:\fw-\u001a:\u0002\u001f5,G/\u00193bi\u0006l\u0015M\\1hKJ\u0004\"!G\u0013\n\u0005\u0019z!AG\"mkN$XM\u001d'j].lU\r^1eCR\fW*\u00198bO\u0016\u0014\u0018!\u00033fgR\fE-\\5o+\u0005I\u0003CA\r+\u0013\tYsBA\u000eM_\u000e\fGn\u00117vgR,'\u000fT5oW\u0006#W.\u001b8DY&,g\u000e^\u0001\u000bI\u0016\u001cH/\u00113nS:\u0004\u0013AD:z]\u000eLe\u000e^3sm\u0006dWj\u001d\t\u0003_Ij\u0011\u0001\r\u0006\u0002c\u0005)1oY1mC&\u00111\u0007\r\u0002\u0004\u0013:$\u0018BA\u001b\u001d\u0003E\u0011Xm]2iK\u0012,H.\u001a#fY\u0006LXj]\u0001\b[\u0016$(/[2t!\tI\u0002(\u0003\u0002:\u001f\t\u00112\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$(/[2t\u0003\u0019a\u0014N\\5u}Q1A(\u0010 @\u0001\u0006\u0003\"!\u0007\u0001\t\u000b}9\u0001\u0019\u0001\u0011\t\u000b\r:\u0001\u0019\u0001\u0013\t\u000b\u001d:\u0001\u0019A\u0015\t\u000b5:\u0001\u0019\u0001\u0018\t\u000bY:\u0001\u0019A\u001c\u0002!Q\f7o[:PkR\u001cH/\u00198eS:<W#\u0001\u0018\u0002)Q\f7o[:PkR\u001cH/\u00198eS:<w\fJ3r)\t1\u0015\n\u0005\u00020\u000f&\u0011\u0001\n\r\u0002\u0005+:LG\u000fC\u0004K\u0013\u0005\u0005\t\u0019\u0001\u0018\u0002\u0007a$\u0013'A\tuCN\\7oT;ugR\fg\u000eZ5oO\u0002\n1A];o)\u0005q\u0005CA\u0018P\u0013\t\u0001\u0006GA\u0004C_>dW-\u00198\u00025!\fg\u000e\u001a7f\t\u0016\u001c8M]5cKR{\u0007/[2D_:4\u0017nZ:\u0015\u00059\u001b\u0006\"\u0002+\r\u0001\u0004)\u0016A\u0002:fgVdG\u000f\u0005\u0003W3nCW\"A,\u000b\u0005a\u0003\u0014AC2pY2,7\r^5p]&\u0011!l\u0016\u0002\u0004\u001b\u0006\u0004\bC\u0001/g\u001b\u0005i&B\u00010`\u0003\u0019\u0019wN\u001c4jO*\u0011\u0001-Y\u0001\u0007G>lWn\u001c8\u000b\u0005Q\u0011'BA2e\u0003\u0019\t\u0007/Y2iK*\tQ-A\u0002pe\u001eL!aZ/\u0003\u001d\r{gNZ5h%\u0016\u001cx.\u001e:dKB\u0019\u0011N\u001b7\u000e\u0003}K!a[0\u0003\u0017-\u000bgm[1GkR,(/\u001a\t\u0003[Jl\u0011A\u001c\u0006\u0003_B\fQ!\u00193nS:T!!]1\u0002\u000f\rd\u0017.\u001a8ug&\u00111O\u001c\u0002\u0007\u0007>tg-[4\u0002/!\fg\u000e\u001a7f\u00032$XM\u001d+pa&\u001c7i\u001c8gS\u001e\u001cHc\u0001(wu\")A+\u0004a\u0001oB\u0011Q\u000e_\u0005\u0003s:\u0014!#\u00117uKJ\u001cuN\u001c4jON\u0014Vm];mi\")10\u0004a\u0001y\u0006qQ\u000f\u001d3bi\u0016$7i\u001c8gS\u001e\u001c\bcB?\u0002\u0002\u0005\r\u0011\u0011D\u0007\u0002}*\u0011qpV\u0001\b[V$\u0018M\u00197f\u0013\tQf\u0010\u0005\u0003\u0002\u0006\u0005Ma\u0002BA\u0004\u0003\u001f\u00012!!\u00031\u001b\t\tYAC\u0002\u0002\u000eU\ta\u0001\u0010:p_Rt\u0014bAA\ta\u00051\u0001K]3eK\u001aLA!!\u0006\u0002\u0018\t11\u000b\u001e:j]\u001eT1!!\u00051!\u0011y\u00131\u00047\n\u0007\u0005u\u0001G\u0001\u0004PaRLwN\u001c")
public class ClusterLinkSyncTopicsConfigs
extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final ClusterLinkMetadataManager metadataManager;
    private final LocalClusterLinkAdminClient destAdmin;
    private final ClusterLinkMetrics metrics;
    private int tasksOutstanding;

    public LocalClusterLinkAdminClient destAdmin() {
        return this.destAdmin;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int x$1) {
        this.tasksOutstanding = x$1;
    }

    @Override
    public boolean run() {
        Set<String> topics;
        scala.collection.immutable.Set resources;
        if (this.tasksOutstanding() != 0) {
            this.warn((Function0<String>)(Function0 & Serializable)() -> "Number of outstanding tasks was not 0 at the beginning of run. Resetting to 0 and continuing on.");
            this.tasksOutstanding_$eq(0);
        }
        if ((resources = ((IterableOnceOps)(topics = this.clientManager.getTopics()).map((Function1 & Serializable)name -> new ConfigResource(ConfigResource.Type.TOPIC, name))).toSet()).nonEmpty()) {
            this.trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(86).append("Attempting to retrieve topic configs from source cluster for following mirror topics: ").append(topics).toString());
            DescribeConfigsResult describeConfigsResult = this.clientManager.getAdmin().describeConfigs((Collection)CollectionConverters$.MODULE$.SetHasAsJava((Set)resources).asJava());
            this.scheduleWhenComplete(describeConfigsResult.all(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable)() -> this.handleDescribeTopicConfigs((Map<ConfigResource, KafkaFuture<Config>>)CollectionConverters$.MODULE$.MapHasAsScala(describeConfigsResult.values()).asScala().toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())));
            this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean handleDescribeTopicConfigs(Map<ConfigResource, KafkaFuture<Config>> result) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        HashMap<ConfigResource, Collection<AlterConfigOp>> alterConfigRequestMap = new HashMap<ConfigResource, Collection<AlterConfigOp>>();
        scala.collection.mutable.Map updatedConfigs = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
        result.foreach((Function1 & Serializable)x0$1 -> {
            if (x0$1 != null) {
                BoxedUnit boxedUnit;
                ConfigResource resource = (ConfigResource)x0$1._1();
                KafkaFuture future = (KafkaFuture)x0$1._2();
                String topic = resource.name();
                try {
                    Properties newProps;
                    Config newSourceTopicConfig = (Config)future.get();
                    this.trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(42).append("Topic configuration for source topic ").append(topic).append(" is: ").append(newSourceTopicConfig).toString());
                    MirrorTopicConfigSyncRules topicConfigSyncRules = $this.clientManager.topicConfigSyncRules();
                    Properties curProps = $this.metadataManager.getTopicConfig(topic);
                    Properties resolvedProps = new MirrorTopicConfigsDelegate(curProps, newSourceTopicConfig, topicConfigSyncRules).updateMirrorProps(topic);
                    Properties properties = newProps = ClusterLinkUtils$.MODULE$.restrictValidateTopicConfigPolicy(topic, resolvedProps, $this.clientManager.alterConfigPolicy());
                    if (!(properties == null ? curProps != null : !((Object)properties).equals(curProps))) {
                        return BoxedUnit.UNIT;
                    }
                    this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(67).append("Updating local configuration for mirror topic '").append(topic).append("' on cluster link '").append($this.clientManager.linkData().linkName()).append("'").toString());
                    scala.collection.mutable.Set newTopicConfigs = (scala.collection.mutable.Set)CollectionConverters$.MODULE$.SetHasAsScala(newProps.entrySet()).asScala().diff((Set)CollectionConverters$.MODULE$.SetHasAsScala(curProps.entrySet()).asScala());
                    this.trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(44).append("Adding configs ").append(newTopicConfigs).append(" for topic ").append(topic).append(" on target cluster").toString());
                    HashSet newConfigs = new HashSet();
                    CollectionConverters$.MODULE$.PropertiesHasAsScala(newProps).asScala().foreach((Function1 & Serializable)entry -> BoxesRunTime.boxToBoolean((boolean)newConfigs.add(new AlterConfigOp(new ConfigEntry((String)entry._1(), (String)entry._2()), AlterConfigOp.OpType.SET))));
                    alterConfigRequestMap.put(new ConfigResource(ConfigResource.Type.TOPIC, topic), newConfigs);
                    boxedUnit = updatedConfigs.put((Object)topic, (Object)new Some((Object)newSourceTopicConfig));
                }
                catch (PolicyViolationException e) {
                    this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(88).append("Could not update mirror topic '").append(topic).append("' configuration due to policy violation on ").append("cluster link ").append($this.clientManager.linkData().linkName()).append("'").toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
                    $this.metrics.topicConfigUpdateFailedSensor().record();
                    return BoxedUnit.UNIT;
                }
                catch (Throwable e) {
                    this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(93).append("Error encountered while processing remote configuration for mirror topic '").append(topic).append("' ").append("on cluster link ").append($this.clientManager.linkData().linkName()).append("'").toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
                    $this.metrics.topicConfigUpdateFailedSensor().record();
                    boxedUnit = BoxedUnit.UNIT;
                }
                return boxedUnit;
            }
            throw new MatchError(null);
        });
        if (!alterConfigRequestMap.isEmpty()) {
            AlterConfigsResult alterConfigResult = this.destAdmin().incrementalAlterMirrorTopicConfigs(alterConfigRequestMap);
            this.scheduleWhenComplete(alterConfigResult.all(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable)() -> this.handleAlterTopicConfigs(alterConfigResult, (scala.collection.mutable.Map<String, Option<Config>>)updatedConfigs));
            this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean handleAlterTopicConfigs(AlterConfigsResult result, scala.collection.mutable.Map<String, Option<Config>> updatedConfigs) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        CollectionConverters$.MODULE$.SetHasAsScala(result.values().entrySet()).asScala().foreach((Function1 & Serializable)entry -> {
            ClusterLinkSyncTopicsConfigs.$anonfun$handleAlterTopicConfigs$1(this, entry);
            return BoxedUnit.UNIT;
        });
        return this.tasksOutstanding() == 0;
    }

    public static final /* synthetic */ void $anonfun$handleAlterTopicConfigs$1(ClusterLinkSyncTopicsConfigs $this, Map.Entry entry) {
        String topic = ((ConfigResource)entry.getKey()).name();
        try {
            ((KafkaFuture)entry.getValue()).get();
            $this.metrics.topicConfigUpdateSensor().record();
            $this.trace((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(52).append("Successfully updated configuration for mirror topic ").append(topic).toString());
            return;
        }
        catch (Throwable e) {
            $this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(65).append("Error encountered while altering configuration on mirror topic ").append(topic).append(": ").append(e).toString());
            $this.metrics.topicConfigUpdateFailedSensor().record();
            return;
        }
    }

    public ClusterLinkSyncTopicsConfigs(ClusterLinkClientManager clientManager, ClusterLinkMetadataManager metadataManager, LocalClusterLinkAdminClient destAdmin, int syncIntervalMs, ClusterLinkMetrics metrics) {
        this.clientManager = clientManager;
        this.metadataManager = metadataManager;
        this.destAdmin = destAdmin;
        this.metrics = metrics;
        super(clientManager.scheduler(), "ClusterLinkSyncTopicsConfigs", syncIntervalMs);
        this.tasksOutstanding = 0;
    }
}

