/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.server.link.ClusterLinkDestClientManager;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkUtils$;
import kafka.server.link.LocalClusterLinkAdminClient;
import kafka.server.link.MirrorTopicConfigSyncRules;
import kafka.server.link.MirrorTopicConfigsDelegate;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenSet;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.Set$;
import scala.collection.SetLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\u0005-d\u0001B\n\u0015\u0001mA\u0001\u0002\n\u0001\u0003\u0002\u0003\u0006I!\n\u0005\tQ\u0001\u0011\t\u0011)A\u0005S!AA\u0006\u0001BC\u0002\u0013\u0005Q\u0006\u0003\u00052\u0001\t\u0005\t\u0015!\u0003/\u0011%\u0011\u0004A!A!\u0002\u0013\u0019\u0014\b\u0003\u0005<\u0001\t\u0005\t\u0015!\u0003=\u0011\u0015y\u0004\u0001\"\u0001A\u0011\u001d9\u0005\u00011A\u0005\n!Cq!\u0013\u0001A\u0002\u0013%!\n\u0003\u0004Q\u0001\u0001\u0006Ka\r\u0005\b#\u0002\u0001\r\u0011\"\u0001S\u0011\u001d1\u0007\u00011A\u0005\u0002\u001dDa!\u001b\u0001!B\u0013\u0019\u0006\"\u00026\u0001\t#Z\u0007\"B8\u0001\t\u0013\u0001\bbBA\u0010\u0001\u0011%\u0011\u0011\u0005\u0005\b\u0003\u000b\u0002A\u0011BA$\u0011\u001d\t\u0019\u0006\u0001C\u0005\u0003+\u0012Ad\u00117vgR,'\u000fT5oWNKhn\u0019+pa&\u001c7oQ8oM&<7O\u0003\u0002\u0016-\u0005!A.\u001b8l\u0015\t9\u0002$\u0001\u0004tKJ4XM\u001d\u0006\u00023\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u001d!\ti\u0012E\u0004\u0002\u001f?5\tA#\u0003\u0002!)\u0005!2\t\\;ti\u0016\u0014H*\u001b8l'\u000eDW\rZ;mKJL!AI\u0012\u0003\u0019A+'/[8eS\u000e$\u0016m]6\u000b\u0005\u0001\"\u0012!D2mS\u0016tG/T1oC\u001e,'\u000f\u0005\u0002\u001fM%\u0011q\u0005\u0006\u0002\u001d\u00072,8\u000f^3s\u0019&t7\u000eR3ti\u000ec\u0017.\u001a8u\u001b\u0006t\u0017mZ3s\u0003=iW\r^1eCR\fW*\u00198bO\u0016\u0014\bC\u0001\u0010+\u0013\tYCC\u0001\u000eDYV\u001cH/\u001a:MS:\\W*\u001a;bI\u0006$\u0018-T1oC\u001e,'/A\u0005eKN$\u0018\tZ7j]V\ta\u0006\u0005\u0002\u001f_%\u0011\u0001\u0007\u0006\u0002\u001c\u0019>\u001c\u0017\r\\\"mkN$XM\u001d'j].\fE-\\5o\u00072LWM\u001c;\u0002\u0015\u0011,7\u000f^!e[&t\u0007%\u0001\bts:\u001c\u0017J\u001c;feZ\fG.T:\u0011\u0005Q:T\"A\u001b\u000b\u0003Y\nQa]2bY\u0006L!\u0001O\u001b\u0003\u0007%sG/\u0003\u0002;C\u0005\t\"/Z:dQ\u0016$W\u000f\\3EK2\f\u00170T:\u0002\u000f5,GO]5dgB\u0011a$P\u0005\u0003}Q\u0011!c\u00117vgR,'\u000fT5oW6+GO]5dg\u00061A(\u001b8jiz\"b!\u0011\"D\t\u00163\u0005C\u0001\u0010\u0001\u0011\u0015!s\u00011\u0001&\u0011\u0015As\u00011\u0001*\u0011\u0015as\u00011\u0001/\u0011\u0015\u0011t\u00011\u00014\u0011\u0015Yt\u00011\u0001=\u0003A!\u0018m]6t\u001fV$8\u000f^1oI&tw-F\u00014\u0003Q!\u0018m]6t\u001fV$8\u000f^1oI&twm\u0018\u0013fcR\u00111J\u0014\t\u0003i1K!!T\u001b\u0003\tUs\u0017\u000e\u001e\u0005\b\u001f&\t\t\u00111\u00014\u0003\rAH%M\u0001\u0012i\u0006\u001c8n](viN$\u0018M\u001c3j]\u001e\u0004\u0013\u0001F;oSF,X\rV8qS\u000e,\u0005pY3qi&|g.F\u0001T!\u0011!\u0016lW.\u000e\u0003US!AV,\u0002\u000f5,H/\u00192mK*\u0011\u0001,N\u0001\u000bG>dG.Z2uS>t\u0017B\u0001.V\u0005\ri\u0015\r\u001d\t\u00039\u000et!!X1\u0011\u0005y+T\"A0\u000b\u0005\u0001T\u0012A\u0002\u001fs_>$h(\u0003\u0002ck\u00051\u0001K]3eK\u001aL!\u0001Z3\u0003\rM#(/\u001b8h\u0015\t\u0011W'\u0001\rv]&\fX/\u001a+pa&\u001cW\t_2faRLwN\\0%KF$\"a\u00135\t\u000f=c\u0011\u0011!a\u0001'\u0006)RO\\5rk\u0016$v\u000e]5d\u000bb\u001cW\r\u001d;j_:\u0004\u0013a\u0001:v]R\tA\u000e\u0005\u00025[&\u0011a.\u000e\u0002\b\u0005>|G.Z1o\u0003iA\u0017M\u001c3mK\u0012+7o\u0019:jE\u0016$v\u000e]5d\u0007>tg-[4t)\ta\u0017\u000fC\u0003s\u001f\u0001\u00071/\u0001\u0004sKN,H\u000e\u001e\t\u0006iV4\u0018qA\u0007\u0002/&\u0011!l\u0016\t\u0004o\u0006\rQ\"\u0001=\u000b\u0005eT\u0018AB2p]\u001aLwM\u0003\u0002|y\u000611m\\7n_:T!!G?\u000b\u0005y|\u0018AB1qC\u000eDWM\u0003\u0002\u0002\u0002\u0005\u0019qN]4\n\u0007\u0005\u0015\u0001P\u0001\bD_:4\u0017n\u001a*fg>,(oY3\u0011\r\u0005%\u00111BA\b\u001b\u0005Q\u0018bAA\u0007u\nY1*\u00194lC\u001a+H/\u001e:f!\u0011\t\t\"a\u0007\u000e\u0005\u0005M!\u0002BA\u000b\u0003/\tQ!\u00193nS:T1!!\u0007}\u0003\u001d\u0019G.[3oiNLA!!\b\u0002\u0014\t11i\u001c8gS\u001e\f!\u0002\\8h\u001b\u0016\u001c8/Y4f)%Y\u00151EA\u0014\u0003{\t\t\u0005\u0003\u0004\u0002&A\u0001\raW\u0001\u0007G\u0006dG.\u001a:\t\u000f\u0005%\u0002\u00031\u0001\u0002,\u0005IA\u000f\u001b:po\u0006\u0014G.\u001a\t\u0005\u0003[\t9D\u0004\u0003\u00020\u0005Mbb\u00010\u00022%\ta'C\u0002\u00026U\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002:\u0005m\"!\u0003+ie><\u0018M\u00197f\u0015\r\t)$\u000e\u0005\u0007\u0003\u007f\u0001\u0002\u0019A.\u0002\u000f5,7o]1hK\"1\u00111\t\tA\u0002m\u000bQ\u0001^8qS\u000e\f\u0011\u0002[1oI2,W\t\u001f9\u0015\u0013-\u000bI%a\u0013\u0002P\u0005E\u0003BBA\"#\u0001\u00071\f\u0003\u0004\u0002NE\u0001\raW\u0001\fKb\u001cW\r\u001d;j_:LE\r\u0003\u0004\u0002@E\u0001\ra\u0017\u0005\b\u0003S\t\u0002\u0019AA\u0016\u0003]A\u0017M\u001c3mK\u0006cG/\u001a:U_BL7mQ8oM&<7\u000fF\u0003m\u0003/\ny\u0006\u0003\u0004s%\u0001\u0007\u0011\u0011\f\t\u0005\u0003#\tY&\u0003\u0003\u0002^\u0005M!AE!mi\u0016\u00148i\u001c8gS\u001e\u001c(+Z:vYRDq!!\u0019\u0013\u0001\u0004\t\u0019'\u0001\bva\u0012\fG/\u001a3D_:4\u0017nZ:\u0011\u000bQK6,!\u001a\u0011\u000bQ\n9'a\u0004\n\u0007\u0005%TG\u0001\u0004PaRLwN\u001c")
public class ClusterLinkSyncTopicsConfigs
extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkDestClientManager clientManager;
    private final ClusterLinkMetadataManager metadataManager;
    private final LocalClusterLinkAdminClient destAdmin;
    private final ClusterLinkMetrics metrics;
    private int tasksOutstanding;
    private scala.collection.mutable.Map<String, String> uniqueTopicException;

    public LocalClusterLinkAdminClient destAdmin() {
        return this.destAdmin;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int x$1) {
        this.tasksOutstanding = x$1;
    }

    public scala.collection.mutable.Map<String, String> uniqueTopicException() {
        return this.uniqueTopicException;
    }

    public void uniqueTopicException_$eq(scala.collection.mutable.Map<String, String> x$1) {
        this.uniqueTopicException = x$1;
    }

    @Override
    public boolean run() {
        if (this.tasksOutstanding() != 0) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Number of outstanding tasks was not 0 at the beginning of run. Resetting to 0 and continuing on.");
            this.tasksOutstanding_$eq(0);
        }
        Set<String> topics = this.clientManager.getTopics();
        this.uniqueTopicException_$eq((scala.collection.mutable.Map<String, String>)((scala.collection.mutable.Map)this.uniqueTopicException().filter((Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkSyncTopicsConfigs.$anonfun$run$2(topics, x0$1)))));
        scala.collection.immutable.Set resources = ((TraversableOnce)topics.map((Function1 & Serializable & scala.Serializable)name -> new ConfigResource(ConfigResource.Type.TOPIC, name), Set$.MODULE$.canBuildFrom())).toSet();
        if (resources.nonEmpty()) {
            this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(86).append("Attempting to retrieve topic configs from source cluster for following mirror topics: ").append(topics).toString());
            DescribeConfigsResult describeConfigsResult = this.clientManager.getAdmin().describeConfigs((Collection)CollectionConverters$.MODULE$.setAsJavaSetConverter((Set)resources).asJava());
            this.scheduleWhenComplete(describeConfigsResult.all(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.handleDescribeTopicConfigs((Map<ConfigResource, KafkaFuture<Config>>)((TraversableOnce)CollectionConverters$.MODULE$.mapAsScalaMapConverter(describeConfigsResult.values()).asScala()).toMap(Predef$.MODULE$.$conforms())));
            this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean handleDescribeTopicConfigs(Map<ConfigResource, KafkaFuture<Config>> result) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        HashMap<ConfigResource, Collection<AlterConfigOp>> alterConfigRequestMap = new HashMap<ConfigResource, Collection<AlterConfigOp>>();
        scala.collection.mutable.Map updatedConfigs = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
        result.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            BoxedUnit boxedUnit;
            if (x0$1 != null) {
                ConfigResource resource = (ConfigResource)x0$1._1();
                KafkaFuture future = (KafkaFuture)x0$1._2();
                String topic = resource.name();
                try {
                    Properties newProps;
                    Config newSourceTopicConfig = (Config)future.get();
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Topic configuration for source topic ").append(topic).append(" is: ").append(newSourceTopicConfig).toString());
                    MirrorTopicConfigSyncRules topicConfigSyncRules = $this.clientManager.topicConfigSyncRules();
                    Properties curProps = $this.metadataManager.getTopicConfig(topic);
                    Properties resolvedProps = new MirrorTopicConfigsDelegate(curProps, newSourceTopicConfig, topicConfigSyncRules).updateMirrorProps(topic);
                    Properties properties = newProps = ClusterLinkUtils$.MODULE$.restrictValidateTopicConfigPolicy(topic, resolvedProps, $this.clientManager.alterConfigPolicy());
                    if (properties == null ? curProps != null : !((Object)properties).equals(curProps)) {
                        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(67).append("Updating local configuration for mirror topic '").append(topic).append("' on cluster link '").append($this.clientManager.linkData().linkName()).append("'").toString());
                        scala.collection.mutable.Set newTopicConfigs = (scala.collection.mutable.Set)((SetLike)CollectionConverters$.MODULE$.asScalaSetConverter(newProps.entrySet()).asScala()).diff((GenSet)CollectionConverters$.MODULE$.asScalaSetConverter(curProps.entrySet()).asScala());
                        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("Adding configs ").append(newTopicConfigs).append(" for topic ").append(topic).append(" on target cluster").toString());
                        HashSet newConfigs = new HashSet();
                        ((IterableLike)CollectionConverters$.MODULE$.propertiesAsScalaMapConverter(newProps).asScala()).foreach((Function1 & Serializable & scala.Serializable)entry -> BoxesRunTime.boxToBoolean((boolean)newConfigs.add(new AlterConfigOp(new ConfigEntry((String)entry._1(), (String)entry._2()), AlterConfigOp.OpType.SET))));
                        alterConfigRequestMap.put(new ConfigResource(ConfigResource.Type.TOPIC, topic), newConfigs);
                        boxedUnit = updatedConfigs.put((Object)topic, (Object)new Some((Object)newSourceTopicConfig));
                    } else {
                        boxedUnit = this.uniqueTopicException().remove((Object)topic);
                    }
                }
                catch (PolicyViolationException e) {
                    String message = new StringBuilder(88).append("Could not update mirror topic '").append(topic).append("' configuration due to policy violation on ").append("cluster link ").append($this.clientManager.linkData().linkName()).append("'").toString();
                    this.logMessage("handleDescribeTopicConfigs", e, message, topic);
                    $this.metrics.topicConfigUpdateFailedSensor().record();
                    boxedUnit = BoxedUnit.UNIT;
                }
                catch (Throwable e) {
                    String message = new StringBuilder(93).append("Error encountered while processing remote configuration for mirror topic '").append(topic).append("' ").append("on cluster link ").append($this.clientManager.linkData().linkName()).append("'").toString();
                    this.logMessage("handleDescribeTopicConfigs", e, message, topic);
                    $this.metrics.topicConfigUpdateFailedSensor().record();
                    boxedUnit = BoxedUnit.UNIT;
                }
            } else {
                throw new MatchError(null);
            }
            BoxedUnit boxedUnit2 = boxedUnit;
            return boxedUnit2;
        });
        if (!alterConfigRequestMap.isEmpty()) {
            AlterConfigsResult alterConfigResult = this.destAdmin().incrementalAlterMirrorTopicConfigs(alterConfigRequestMap);
            this.scheduleWhenComplete(alterConfigResult.all(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.handleAlterTopicConfigs(alterConfigResult, (scala.collection.mutable.Map<String, Option<Config>>)updatedConfigs));
            this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
        }
        return this.tasksOutstanding() == 0;
    }

    private void logMessage(String caller, Throwable throwable, String message, String topic) {
        if (throwable instanceof PolicyViolationException) {
            String causeStr = throwable.getClass().getSimpleName();
            String msg = throwable.getMessage();
            String exceptionId = new StringBuilder(2).append(causeStr).append(" ").append(msg).append(" ").append(caller).toString();
            this.handleExp(topic, exceptionId, message, throwable);
            return;
        }
        if (throwable instanceof ExecutionException && ((ExecutionException)throwable).getCause() != null) {
            String causeStr = throwable.getCause().getClass().getSimpleName();
            String msg = throwable.getCause().getMessage();
            String exceptionId = new StringBuilder(2).append(causeStr).append(" ").append(msg).append(" ").append(caller).toString();
            this.handleExp(topic, exceptionId, message, throwable);
            return;
        }
        this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> message, (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> throwable);
    }

    private void handleExp(String topic, String exceptionId, String message, Throwable throwable) {
        if (this.uniqueTopicException().get((Object)topic).contains((Object)exceptionId)) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> message, (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> throwable);
        } else {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> message, (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> throwable);
        }
        this.uniqueTopicException().put((Object)topic, (Object)exceptionId);
    }

    private boolean handleAlterTopicConfigs(AlterConfigsResult result, scala.collection.mutable.Map<String, Option<Config>> updatedConfigs) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        ((IterableLike)CollectionConverters$.MODULE$.asScalaSetConverter(result.values().entrySet()).asScala()).foreach((Function1 & Serializable & scala.Serializable)entry -> {
            String topic = ((ConfigResource)entry.getKey()).name();
            try {
                ((KafkaFuture)entry.getValue()).get();
                $this.metrics.topicConfigUpdateSensor().record();
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Successfully updated configuration for mirror topic ").append(topic).toString());
                return this.uniqueTopicException().remove((Object)topic);
            }
            catch (Throwable e) {
                String message = new StringBuilder(63).append("Error encountered while altering configuration on mirror topic ").append(topic).toString();
                this.logMessage("handleAlterTopicConfigs", e, message, topic);
                $this.metrics.topicConfigUpdateFailedSensor().record();
                return BoxedUnit.UNIT;
            }
        });
        return this.tasksOutstanding() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$run$2(Set topics$1, Tuple2 x0$1) {
        if (x0$1 == null) {
            throw new MatchError(null);
        }
        String t = (String)x0$1._1();
        boolean bl = topics$1.contains((Object)t);
        return bl;
    }

    public ClusterLinkSyncTopicsConfigs(ClusterLinkDestClientManager clientManager, ClusterLinkMetadataManager metadataManager, LocalClusterLinkAdminClient destAdmin, int syncIntervalMs, ClusterLinkMetrics metrics) {
        this.clientManager = clientManager;
        this.metadataManager = metadataManager;
        this.destAdmin = destAdmin;
        this.metrics = metrics;
        super(clientManager.scheduler(), "ClusterLinkSyncTopicsConfigs", syncIntervalMs);
        this.tasksOutstanding = 0;
        this.uniqueTopicException = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
    }
}

