/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import kafka.controller.KafkaController;
import kafka.server.link.AclFiltersJson;
import kafka.server.link.AclJson$;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Authorizer;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenSet;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\u0005mc\u0001B\t\u0013\u0001eA\u0001B\t\u0001\u0003\u0006\u0004%\ta\t\u0005\tO\u0001\u0011\t\u0011)A\u0005I!A\u0001\u0006\u0001B\u0001B\u0003%\u0011\u0006\u0003\u0005/\u0001\t\u0005\t\u0015!\u00030\u0011\u0015\u0011\u0004\u0001\"\u00014\u0011\u001dA\u0004A1A\u0005\neBa!\u0015\u0001!\u0002\u0013Q\u0004b\u0002*\u0001\u0001\u0004%Ia\u0015\u0005\b1\u0002\u0001\r\u0011\"\u0003Z\u0011\u0019y\u0006\u0001)Q\u0005)\")\u0001\r\u0001C)C\")Q\r\u0001C\u0005M\")\u0001\u0010\u0001C\u0005s\"9\u0011Q\u0007\u0001\u0005\n\u0005]\u0002BBA,\u0001\u0011\u0005\u0011\b\u0003\u0004\u0002Z\u0001!\ta\u0015\u0002\u0014\u00072,8\u000f^3s\u0019&t7nU=oG\u0006\u001bGn\u001d\u0006\u0003'Q\tA\u0001\\5oW*\u0011QCF\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003]\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u00015A\u00111d\b\b\u00039ui\u0011AE\u0005\u0003=I\tAc\u00117vgR,'\u000fT5oWN\u001b\u0007.\u001a3vY\u0016\u0014\u0018B\u0001\u0011\"\u00051\u0001VM]5pI&\u001cG+Y:l\u0015\tq\"#A\u0007dY&,g\u000e^'b]\u0006<WM]\u000b\u0002IA\u0011A$J\u0005\u0003MI\u0011\u0001d\u00117vgR,'\u000fT5oW\u000ec\u0017.\u001a8u\u001b\u0006t\u0017mZ3s\u00039\u0019G.[3oi6\u000bg.Y4fe\u0002\n!bY8oiJ|G\u000e\\3s!\tQC&D\u0001,\u0015\tAc#\u0003\u0002.W\ty1*\u00194lC\u000e{g\u000e\u001e:pY2,'/A\u0004nKR\u0014\u0018nY:\u0011\u0005q\u0001\u0014BA\u0019\u0013\u0005I\u0019E.^:uKJd\u0015N\\6NKR\u0014\u0018nY:\u0002\rqJg.\u001b;?)\u0011!TGN\u001c\u0011\u0005q\u0001\u0001\"\u0002\u0012\u0006\u0001\u0004!\u0003\"\u0002\u0015\u0006\u0001\u0004I\u0003\"\u0002\u0018\u0006\u0001\u0004y\u0013!D2veJ,g\u000e^!dYN+G/F\u0001;!\rY$\tR\u0007\u0002y)\u0011QHP\u0001\b[V$\u0018M\u00197f\u0015\ty\u0004)\u0001\u0006d_2dWm\u0019;j_:T\u0011!Q\u0001\u0006g\u000e\fG.Y\u0005\u0003\u0007r\u00121aU3u!\t)u*D\u0001G\u0015\t9\u0005*A\u0002bG2T!!\u0013&\u0002\r\r|W.\\8o\u0015\t92J\u0003\u0002M\u001b\u00061\u0011\r]1dQ\u0016T\u0011AT\u0001\u0004_J<\u0017B\u0001)G\u0005)\t5\r\u001c\"j]\u0012LgnZ\u0001\u000fGV\u0014(/\u001a8u\u0003\u000ed7+\u001a;!\u0003A!\u0018m]6t\u001fV$8\u000f^1oI&tw-F\u0001U!\t)f+D\u0001A\u0013\t9\u0006IA\u0002J]R\fA\u0003^1tWN|U\u000f^:uC:$\u0017N\\4`I\u0015\fHC\u0001.^!\t)6,\u0003\u0002]\u0001\n!QK\\5u\u0011\u001dq\u0016\"!AA\u0002Q\u000b1\u0001\u001f\u00132\u0003E!\u0018m]6t\u001fV$8\u000f^1oI&tw\rI\u0001\u0004eVtG#\u00012\u0011\u0005U\u001b\u0017B\u00013A\u0005\u001d\u0011un\u001c7fC:\f!\"\u001e9eCR,\u0017i\u00197t)\t\u0011w\rC\u0003i\u0019\u0001\u0007\u0011.\u0001\u0006gkR,(/\u001a'jgR\u00042a\u000f6m\u0013\tYGH\u0001\u0006MSN$()\u001e4gKJ\u00042!\u001c8q\u001b\u0005A\u0015BA8I\u0005-Y\u0015MZ6b\rV$XO]3\u0011\u0007E4H)D\u0001s\u0015\t\u0019H/\u0001\u0003vi&d'\"A;\u0002\t)\fg/Y\u0005\u0003oJ\u0014!bQ8mY\u0016\u001cG/[8o\u0003u\tG\rZ!dYN\fe\u000e\u001a'pO\u000e\u0013X-\u0019;j_:<\u0016M\u001d8j]\u001e\u001cHC\u00022{\u0003W\t\t\u0004C\u0003|\u001b\u0001\u0007A0A\nbG2\u001c%/Z1uKJ+7/\u001e7u\u0019&\u001cH\u000fE\u0003~\u0003\u0017\t\tBD\u0002\u007f\u0003\u000fq1a`A\u0003\u001b\t\t\tAC\u0002\u0002\u0004a\ta\u0001\u0010:p_Rt\u0014\"A!\n\u0007\u0005%\u0001)A\u0004qC\u000e\\\u0017mZ3\n\t\u00055\u0011q\u0002\u0002\u0005\u0019&\u001cHOC\u0002\u0002\n\u0001\u0003b!a\u0005\u0002\u001a\u0005uQBAA\u000b\u0015\r\t9B]\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BA\u000e\u0003+\u0011\u0011cQ8na2,G/\u00192mK\u001a+H/\u001e:f!\u0011\ty\"a\n\u000e\u0005\u0005\u0005\"\u0002BA\u0012\u0003K\t!\"Y;uQ>\u0014\u0018N_3s\u0015\t)\"*\u0003\u0003\u0002*\u0005\u0005\"aD!dY\u000e\u0013X-\u0019;f%\u0016\u001cX\u000f\u001c;\t\u000f\u00055R\u00021\u0001\u00020\u0005q1M]3bi\u0016$\u0017i\u00197MSN$\b\u0003B?\u0002\f\u0011Ca!a\r\u000e\u0001\u0004Q\u0014aC1eI\u0016$\u0017i\u00197TKR\f\u0001\u0005Z3mKR,\u0017i\u00197t\u0003:$Gj\\4EK2,G/[8o/\u0006\u0014h.\u001b8hgR9!-!\u000f\u0002H\u0005-\u0003bBA\u001e\u001d\u0001\u0007\u0011QH\u0001\u0014C\u000edG)\u001a7fi\u0016\u0014Vm];mi2K7\u000f\u001e\t\u0006{\u0006-\u0011q\b\t\u0007\u0003'\tI\"!\u0011\u0011\t\u0005}\u00111I\u0005\u0005\u0003\u000b\n\tCA\bBG2$U\r\\3uKJ+7/\u001e7u\u0011\u0019\tIE\u0004a\u0001u\u0005iA-\u001a7fi\u0016$\u0017i\u00197TKRDq!!\u0014\u000f\u0001\u0004\ty%\u0001\u000beK2,G/\u001a3BG24\u0015\u000e\u001c;fe2K7\u000f\u001e\t\u0006{\u0006-\u0011\u0011\u000b\t\u0004\u000b\u0006M\u0013bAA+\r\n\u0001\u0012i\u00197CS:$\u0017N\\4GS2$XM]\u0001\u0011O\u0016$8)\u001e:sK:$\u0018i\u00197TKR\fqcY;se\u0016tG\u000fV1tWN|U\u000f^:uC:$\u0017N\\4")
public class ClusterLinkSyncAcls
extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final KafkaController controller;
    private final ClusterLinkMetrics metrics;
    private final Set<AclBinding> currentAclSet;
    private int tasksOutstanding;

    public ClusterLinkClientManager clientManager() {
        return this.clientManager;
    }

    private Set<AclBinding> currentAclSet() {
        return this.currentAclSet;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int x$1) {
        this.tasksOutstanding = x$1;
    }

    @Override
    public boolean run() {
        if (this.controller.isActive()) {
            if (this.tasksOutstanding() != 0) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Number of outstanding tasks was not 0 at the beginning of run. Resetting to 0 and continuing on.");
                this.tasksOutstanding_$eq(0);
            }
            AclFiltersJson aclFilterJson = (AclFiltersJson)this.clientManager().currentConfig().aclFilters().get();
            ListBuffer<AclBindingFilter> aclFilterList = AclJson$.MODULE$.toAclBindingFilters(aclFilterJson);
            ListBuffer describeAclsResultList = (ListBuffer)ListBuffer$.MODULE$.apply((Seq)Nil$.MODULE$);
            aclFilterList.foreach((Function1 & Serializable & scala.Serializable)aclFilter -> {
                Some describeAclsResult;
                Some some;
                try {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Attempting to retrieve ACLs from source cluster");
                    some = new Some((Object)this.clientManager().getAdmin().describeAcls(aclFilter));
                }
                catch (ExecutionException e) {
                    None$ none$;
                    Throwable throwable = e.getCause();
                    if (throwable instanceof AuthorizationException) {
                        this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Unable to retrieve ACLs on source cluster. Please enable DESCRIBE ACLs on the source cluster to proceed with ACL migration.");
                        none$ = None$.MODULE$;
                    } else if (throwable != null) {
                        this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(78).append("Unexpected error encountered while trying to retrieve ACLs on source cluster: ").append(e).toString());
                        none$ = None$.MODULE$;
                    } else {
                        throw new MatchError(null);
                    }
                    some = none$;
                }
                catch (Throwable e) {
                    this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(78).append("Unexpected error encountered while trying to retrieve ACLs on source cluster: ").append(e).toString());
                    some = describeAclsResult = None$.MODULE$;
                }
                if (describeAclsResult.isDefined()) {
                    return describeAclsResultList.$plus$eq((Object)describeAclsResult);
                }
                return BoxedUnit.UNIT;
            });
            if (describeAclsResultList.nonEmpty()) {
                ListBuffer futureList = (ListBuffer)describeAclsResultList.map((Function1 & Serializable & scala.Serializable)result -> ((DescribeAclsResult)result.get()).values(), ListBuffer$.MODULE$.canBuildFrom());
                KafkaFuture future = KafkaFuture.allOf((KafkaFuture[])((KafkaFuture[])futureList.toArray(ClassTag$.MODULE$.apply(KafkaFuture.class))));
                this.scheduleWhenComplete(future, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.updateAcls((ListBuffer<KafkaFuture<Collection<AclBinding>>>)futureList));
                this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
            }
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean updateAcls(ListBuffer<KafkaFuture<Collection<AclBinding>>> futureList) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        Set describeAclResultSet = (Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        futureList.foreach((Function1 & Serializable & scala.Serializable)future -> (Set)describeAclResultSet.$plus$plus$eq((TraversableOnce)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)future.get()).asScala()));
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Result of describeAcls on source cluster: ").append(describeAclResultSet).toString());
        Set deletedAcls = (Set)this.currentAclSet().diff((GenSet)describeAclResultSet);
        Set addedAcls = (Set)describeAclResultSet.diff(this.currentAclSet());
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Removing following ACLs on destination cluster: ").append(deletedAcls).toString());
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("Adding following ACLs on destination cluster: ").append(addedAcls).toString());
        this.clientManager().getAuthorizer().foreach((Function1 & Serializable & scala.Serializable)auth -> {
            ClusterLinkSyncAcls.$anonfun$updateAcls$5(this, addedAcls, deletedAcls, auth);
            return BoxedUnit.UNIT;
        });
        return this.tasksOutstanding() == 0;
    }

    private boolean addAclsAndLogCreationWarnings(List<CompletableFuture<AclCreateResult>> aclCreateResultList, List<AclBinding> createdAclList, Set<AclBinding> addedAclSet) {
        ((List)aclCreateResultList.zip(createdAclList, List$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            ClusterLinkSyncAcls.$anonfun$addAclsAndLogCreationWarnings$1(this, addedAclSet, x0$1);
            return BoxedUnit.UNIT;
        });
        this.currentAclSet().$plus$plus$eq(addedAclSet);
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        return this.tasksOutstanding() == 0;
    }

    private boolean deleteAclsAndLogDeletionWarnings(List<CompletableFuture<AclDeleteResult>> aclDeleteResultList, Set<AclBinding> deletedAclSet, List<AclBindingFilter> deletedAclFilterList) {
        ((List)aclDeleteResultList.zip(deletedAclFilterList, List$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            ClusterLinkSyncAcls.$anonfun$deleteAclsAndLogDeletionWarnings$1(this, deletedAclSet, x0$1);
            return BoxedUnit.UNIT;
        });
        this.currentAclSet().$minus$minus$eq(deletedAclSet);
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        return this.tasksOutstanding() == 0;
    }

    public Set<AclBinding> getCurrentAclSet() {
        return this.currentAclSet();
    }

    public int currentTasksOutstanding() {
        return this.tasksOutstanding();
    }

    public static final /* synthetic */ void $anonfun$updateAcls$5(ClusterLinkSyncAcls $this, Set addedAcls$1, Set deletedAcls$1, Authorizer auth) {
        if (addedAcls$1.nonEmpty()) {
            try {
                List addedAclsList = addedAcls$1.toList();
                Buffer createdAclResults = (Buffer)((TraversableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(auth.createAcls(null, (java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)addedAclsList).asJava(), Optional.empty())).asScala()).map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.toCompletableFuture(), Buffer$.MODULE$.canBuildFrom());
                CompletableFuture<Void> createdAclsFuture = CompletableFuture.allOf((CompletableFuture[])createdAclResults.toArray(ClassTag$.MODULE$.apply(CompletableFuture.class)));
                $this.scheduleWhenComplete(createdAclsFuture, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> $this.addAclsAndLogCreationWarnings((List<CompletableFuture<AclCreateResult>>)createdAclResults.toList(), (List<AclBinding>)addedAclsList, (Set<AclBinding>)addedAcls$1));
                $this.tasksOutstanding_$eq($this.tasksOutstanding() + 1);
            }
            catch (Throwable e) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(81).append("Unexpected error encountered while trying to create ACLs on destination cluster: ").append(e).toString());
            }
        }
        if (deletedAcls$1.nonEmpty()) {
            try {
                List deleteAclsFilterList = ((TraversableOnce)deletedAcls$1.map((Function1 & Serializable & scala.Serializable)acl -> acl.toFilter(), Set$.MODULE$.canBuildFrom())).toList();
                Buffer deletedAclResults = (Buffer)((TraversableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(auth.deleteAcls(null, (java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)deleteAclsFilterList).asJava(), Optional.empty())).asScala()).map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.toCompletableFuture(), Buffer$.MODULE$.canBuildFrom());
                CompletableFuture<Void> deletedAclsFuture = CompletableFuture.allOf((CompletableFuture[])deletedAclResults.toArray(ClassTag$.MODULE$.apply(CompletableFuture.class)));
                $this.scheduleWhenComplete(deletedAclsFuture, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> $this.deleteAclsAndLogDeletionWarnings((List<CompletableFuture<AclDeleteResult>>)deletedAclResults.toList(), (Set<AclBinding>)deletedAcls$1, (List<AclBindingFilter>)deleteAclsFilterList));
                $this.tasksOutstanding_$eq($this.tasksOutstanding() + 1);
                return;
            }
            catch (Throwable e) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(81).append("Unexpected error encountered while trying to delete ACLs on destination cluster: ").append(e).toString());
                return;
            }
        }
    }

    public static final /* synthetic */ void $anonfun$addAclsAndLogCreationWarnings$1(ClusterLinkSyncAcls $this, Set addedAcls$2, Tuple2 x0$1) {
        if (x0$1 != null) {
            CompletableFuture future = (CompletableFuture)x0$1._1();
            AclBinding createdList = (AclBinding)x0$1._2();
            try {
                AclCreateResult createdAcl = (AclCreateResult)future.get();
                if (!createdAcl.exception().isPresent()) {
                    $this.metrics.aclsAddedSensor().record();
                    return;
                }
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Encountered the following exception while trying to create ACL: ").append(createdAcl.exception().get()).toString());
                addedAcls$2.$minus$eq((Object)createdList);
                $this.metrics.aclsAddFailedSensor().record();
            }
            catch (Throwable e) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Unexpected error encountered while trying to create ACL: ").append(e).toString());
                addedAcls$2.$minus$eq((Object)createdList);
                $this.metrics.aclsAddFailedSensor().record();
                return;
            }
        } else {
            throw new MatchError(null);
        }
    }

    public static final /* synthetic */ void $anonfun$deleteAclsAndLogDeletionWarnings$2(ClusterLinkSyncAcls $this, Set deletedAcls$2, AclDeleteResult.AclBindingDeleteResult aclBindingDeleteResult) {
        if (aclBindingDeleteResult.exception().isPresent()) {
            $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Encountered the following exception while trying to delete ACL: ").append(aclBindingDeleteResult.exception().get()).toString());
            deletedAcls$2.$minus$eq((Object)aclBindingDeleteResult.aclBinding());
            $this.metrics.aclsDeleteFailedSensor().record();
            return;
        }
        $this.metrics.aclsDeletedSensor().record();
    }

    public static final /* synthetic */ void $anonfun$deleteAclsAndLogDeletionWarnings$1(ClusterLinkSyncAcls $this, Set deletedAcls$2, Tuple2 x0$1) {
        if (x0$1 != null) {
            CompletableFuture future = (CompletableFuture)x0$1._1();
            AclBindingFilter filter = (AclBindingFilter)x0$1._2();
            try {
                Collection aclBindingDeleteResultList = ((AclDeleteResult)future.get()).aclBindingDeleteResults();
                ((IterableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(aclBindingDeleteResultList).asScala()).foreach((Function1 & Serializable & scala.Serializable)aclBindingDeleteResult -> {
                    ClusterLinkSyncAcls.$anonfun$deleteAclsAndLogDeletionWarnings$2($this, deletedAcls$2, aclBindingDeleteResult);
                    return BoxedUnit.UNIT;
                });
                return;
            }
            catch (Throwable e) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Unexpected error encountered while trying to delete ACL: ").append(e).toString());
                deletedAcls$2.$minus$minus$eq((TraversableOnce)deletedAcls$2.filter((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)filter.matches(x$1))));
                $this.metrics.aclsDeleteFailedSensor().record();
                return;
            }
        }
        throw new MatchError(null);
    }

    public ClusterLinkSyncAcls(ClusterLinkClientManager clientManager, KafkaController controller, ClusterLinkMetrics metrics) {
        this.clientManager = clientManager;
        this.controller = controller;
        this.metrics = metrics;
        super(clientManager.scheduler(), "ClusterLinkSyncAcls", Predef$.MODULE$.Integer2int(clientManager.currentConfig().aclSyncMs()));
        this.currentAclSet = (Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        this.tasksOutstanding = 0;
    }
}

