/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import kafka.controller.KafkaController;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFilterInfo;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkSyncOffsets$;
import kafka.server.link.ClusterLinkTopicState;
import kafka.server.link.ClusterLinkUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenMap;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set$;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\u0005\rw!\u0002\u0011\"\u0011\u0003Ac!\u0002\u0016\"\u0011\u0003Y\u0003\"\u0002\u001a\u0002\t\u0003\u0019\u0004b\u0002\u001b\u0002\u0005\u0004%\t!\u000e\u0005\u0007s\u0005\u0001\u000b\u0011\u0002\u001c\u0007\t)\n\u0003A\u000f\u0005\t\u0005\u0016\u0011)\u0019!C\u0001\u0007\"Aq)\u0002B\u0001B\u0003%A\t\u0003\u0005I\u000b\t\u0005\t\u0015!\u0003J\u0011!yUA!A!\u0002\u0013\u0001\u0006\u0002C+\u0006\u0005\u000b\u0007I\u0011\u0001,\t\u0011\u001d,!\u0011!Q\u0001\n]C\u0001\u0002[\u0003\u0003\u0002\u0003\u0006I!\u001b\u0005\u0006e\u0015!\t\u0001\u001c\u0005\tg\u0016\u0011\r\u0011\"\u0001\"i\"9\u0011qE\u0003!\u0002\u0013)\b\"CA\u0015\u000b\u0001\u0007I\u0011BA\u0016\u0011%\t\u0019$\u0002a\u0001\n\u0013\t)\u0004\u0003\u0005\u0002B\u0015\u0001\u000b\u0015BA\u0017\u0011!\t\u0019%\u0002a\u0001\n\u0013)\u0004\"CA#\u000b\u0001\u0007I\u0011BA$\u0011\u001d\tY%\u0002Q!\nYB\u0011\"!\u0014\u0006\u0001\u0004%I!a\u0014\t\u0013\u0005]S\u00011A\u0005\n\u0005e\u0003\u0002CA/\u000b\u0001\u0006K!!\u0015\t\u000f\u0005}S\u0001\"\u0015\u0002b!9\u00111M\u0003\u0005\n\u0005\u0015\u0004bBA9\u000b\u0011%\u00111\u000f\u0005\b\u0003o*A\u0011BA=\u0011\u001d\t\t+\u0002C\u0005\u0003GCq!!.\u0006\t\u0013\t9\fC\u0004\u0002<\u0016!I!!0\u0002-\rcWo\u001d;fe2Kgn[*z]\u000e|eMZ:fiNT!AI\u0012\u0002\t1Lgn\u001b\u0006\u0003I\u0015\naa]3sm\u0016\u0014(\"\u0001\u0014\u0002\u000b-\fgm[1\u0004\u0001A\u0011\u0011&A\u0007\u0002C\t12\t\\;ti\u0016\u0014H*\u001b8l'ft7m\u00144gg\u0016$8o\u0005\u0002\u0002YA\u0011Q\u0006M\u0007\u0002])\tq&A\u0003tG\u0006d\u0017-\u0003\u00022]\t1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#\u0001\u0015\u0002A1K7\u000f^\"p]N,X.\u001a:He>,\bo\u00144gg\u0016$()\u0019;dQNK'0Z\u000b\u0002mA\u0011QfN\u0005\u0003q9\u00121!\u00138u\u0003\u0005b\u0015n\u001d;D_:\u001cX/\\3s\u000fJ|W\u000f](gMN,GOQ1uG\"\u001c\u0016N_3!'\t)1\b\u0005\u0002=\u007f9\u0011\u0011&P\u0005\u0003}\u0005\nAc\u00117vgR,'\u000fT5oWN\u001b\u0007.\u001a3vY\u0016\u0014\u0018B\u0001!B\u00051\u0001VM]5pI&\u001cG+Y:l\u0015\tq\u0014%A\u0007dY&,g\u000e^'b]\u0006<WM]\u000b\u0002\tB\u0011\u0011&R\u0005\u0003\r\u0006\u0012\u0001d\u00117vgR,'\u000fT5oW\u000ec\u0017.\u001a8u\u001b\u0006t\u0017mZ3s\u00039\u0019G.[3oi6\u000bg.Y4fe\u0002\n\u0001\u0002\\5oW\u0012\u000bG/\u0019\t\u0003\u00156k\u0011a\u0013\u0006\u0003\u0019\u0016\n!A_6\n\u00059[%aD\"mkN$XM\u001d'j].$\u0015\r^1\u0002\u0015\r|g\u000e\u001e:pY2,'\u000f\u0005\u0002R'6\t!K\u0003\u0002PK%\u0011AK\u0015\u0002\u0010\u0017\u000647.Y\"p]R\u0014x\u000e\u001c7fe\u0006\u0001B-Z:u\u0003\u0012l\u0017N\u001c$bGR|'/_\u000b\u0002/B\u0019Q\u0006\u0017.\n\u0005es#!\u0003$v]\u000e$\u0018n\u001c81!\tYV-D\u0001]\u0015\tif,A\u0003bI6LgN\u0003\u0002`A\u000691\r\\5f]R\u001c(B\u0001\u0014b\u0015\t\u00117-\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002I\u0006\u0019qN]4\n\u0005\u0019d&!B!e[&t\u0017!\u00053fgR\fE-\\5o\r\u0006\u001cGo\u001c:zA\u00059Q.\u001a;sS\u000e\u001c\bCA\u0015k\u0013\tY\u0017E\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001cHCB7o_B\f(\u000f\u0005\u0002*\u000b!)!)\u0004a\u0001\t\")\u0001*\u0004a\u0001\u0013\")q*\u0004a\u0001!\")Q+\u0004a\u0001/\")\u0001.\u0004a\u0001S\u0006q1-\u001e:sK:$xJ\u001a4tKR\u001cX#A;\u0011\u000bY\\X0!\u0005\u000e\u0003]T!\u0001_=\u0002\u000f5,H/\u00192mK*\u0011!PL\u0001\u000bG>dG.Z2uS>t\u0017B\u0001?x\u0005\ri\u0015\r\u001d\t\u0004}\u0006-abA@\u0002\bA\u0019\u0011\u0011\u0001\u0018\u000e\u0005\u0005\r!bAA\u0003O\u00051AH]8pizJ1!!\u0003/\u0003\u0019\u0001&/\u001a3fM&!\u0011QBA\b\u0005\u0019\u0019FO]5oO*\u0019\u0011\u0011\u0002\u0018\u0011\u000fy\f\u0019\"!\u0006\u0002\"%\u0019A0a\u0004\u0011\t\u0005]\u0011QD\u0007\u0003\u00033Q1!a\u0007a\u0003\u0019\u0019w.\\7p]&!\u0011qDA\r\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:\u00042!LA\u0012\u0013\r\t)C\f\u0002\u0005\u0019>tw-A\bdkJ\u0014XM\u001c;PM\u001a\u001cX\r^:!\u0003\u0019\u0019wN\u001c4jOV\u0011\u0011Q\u0006\t\u0004S\u0005=\u0012bAA\u0019C\t\t2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4\u0002\u0015\r|gNZ5h?\u0012*\u0017\u000f\u0006\u0003\u00028\u0005u\u0002cA\u0017\u0002:%\u0019\u00111\b\u0018\u0003\tUs\u0017\u000e\u001e\u0005\n\u0003\u007f\t\u0012\u0011!a\u0001\u0003[\t1\u0001\u001f\u00132\u0003\u001d\u0019wN\u001c4jO\u0002\n\u0001\u0003^1tWN|U\u000f^:uC:$\u0017N\\4\u0002)Q\f7o[:PkR\u001cH/\u00198eS:<w\fJ3r)\u0011\t9$!\u0013\t\u0011\u0005}B#!AA\u0002Y\n\u0011\u0003^1tWN|U\u000f^:uC:$\u0017N\\4!\u0003!)8/\u001a\"bi\u000eDWCAA)!\ri\u00131K\u0005\u0004\u0003+r#a\u0002\"p_2,\u0017M\\\u0001\rkN,')\u0019;dQ~#S-\u001d\u000b\u0005\u0003o\tY\u0006C\u0005\u0002@]\t\t\u00111\u0001\u0002R\u0005IQo]3CCR\u001c\u0007\u000eI\u0001\u0004eVtGCAA)\u0003Q1\u0017\u000e\u001c;fe\u000e{gn];nKJ<%o\\;qgR!\u0011qMA7!\u0011q\u0018\u0011N?\n\t\u0005-\u0014q\u0002\u0002\u0004'\u0016$\bbBA85\u0001\u0007\u0011qM\u0001\u0007OJ|W\u000f]:\u0002=1L7\u000f^*pkJ\u001cWmQ8ogVlWM]$s_V\u0004xJ\u001a4tKR\u001cH\u0003BA)\u0003kBq!a\u001c\u001c\u0001\u0004\t9'A\u0007e_\u0006\u001b\u0018P\\2D_6l\u0017\u000e\u001e\u000b\u0005\u0003#\nY\bC\u0004\u0002~q\u0001\r!a \u0002\u0019\u001d\u0014x.\u001e9GkR,(/Z:\u0011\ry\f\u0019\"`AA!\u0019\t9\"a!\u0002\b&!\u0011QQA\r\u0005-Y\u0015MZ6b\rV$XO]3\u0011\u0011\u0005%\u00151SA\u000b\u0003+k!!a#\u000b\t\u00055\u0015qR\u0001\u0005kRLGN\u0003\u0002\u0002\u0012\u0006!!.\u0019<b\u0013\ra\u00181\u0012\t\u0005\u0003/\u000bi*\u0004\u0002\u0002\u001a*\u0019\u00111\u00140\u0002\u0011\r|gn];nKJLA!a(\u0002\u001a\n\trJ\u001a4tKR\fe\u000eZ'fi\u0006$\u0017\r^1\u0002Q!\fg\u000e\u001a7f\t\u0016\u001cH/\u001b8bi&|gn\u00117vgR,'oQ8n[&$H/\u001a3PM\u001a\u001cX\r^:\u0015\r\u0005E\u0013QUAY\u0011\u001d\t9+\ba\u0001\u0003S\u000bQbY8n[&$(+Z:vYR\u001c\bC\u0002@\u0002\u0014u\fY\u000bE\u0002\\\u0003[K1!a,]\u0005}\tE\u000e^3s\u0007>t7/^7fe\u001e\u0013x.\u001e9PM\u001a\u001cX\r^:SKN,H\u000e\u001e\u0005\b\u0003gk\u0002\u0019AA@\u0003-a\u0017n\u001d;GkR,(/Z:\u0002]\u0005\u001c\u0018P\\2D_6l\u0017\u000e^\"p]N,X.\u001a:PM\u001a\u001cX\r^:U_\u0012+7\u000f^5oCRLwN\\\"mkN$XM\u001d\u000b\u0005\u0003S\u000bI\fC\u0004\u0002~y\u0001\r!a \u0002\u0015MDw.\u001e7e'ft7\r\u0006\u0003\u0002R\u0005}\u0006BBAa?\u0001\u0007Q0A\u0003u_BL7\r")
public class ClusterLinkSyncOffsets
extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final ClusterLinkData linkData;
    private final KafkaController controller;
    private final Function0<Admin> destAdminFactory;
    private final ClusterLinkMetrics metrics;
    private final scala.collection.mutable.Map<String, scala.collection.immutable.Map<TopicPartition, Object>> currentOffsets;
    private ClusterLinkConfig config;
    private int tasksOutstanding;
    private boolean useBatch;

    public static int ListConsumerGroupOffsetBatchSize() {
        return ClusterLinkSyncOffsets$.MODULE$.ListConsumerGroupOffsetBatchSize();
    }

    public ClusterLinkClientManager clientManager() {
        return this.clientManager;
    }

    public Function0<Admin> destAdminFactory() {
        return this.destAdminFactory;
    }

    public scala.collection.mutable.Map<String, scala.collection.immutable.Map<TopicPartition, Object>> currentOffsets() {
        return this.currentOffsets;
    }

    private ClusterLinkConfig config() {
        return this.config;
    }

    private void config_$eq(ClusterLinkConfig x$1) {
        this.config = x$1;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int x$1) {
        this.tasksOutstanding = x$1;
    }

    private boolean useBatch() {
        return this.useBatch;
    }

    private void useBatch_$eq(boolean x$1) {
        this.useBatch = x$1;
    }

    @Override
    public boolean run() {
        this.config_$eq(this.clientManager().currentConfig());
        if (this.controller.isActive() && this.config().consumerOffsetSyncEnable()) {
            if (this.tasksOutstanding() != 0) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Number of outstanding tasks was not 0 at the beginning of run. Resetting to 0 and continuing on.");
                this.tasksOutstanding_$eq(0);
            }
            if (this.config().consumerGroupFilters().isEmpty()) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(91).append(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()).append(" is true but no consumer group filters are specified. No consumer offsets will be migrated.").toString());
            } else {
                try {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Attempting to retrieve consumer groups from source cluster");
                    ListConsumerGroupsResult listConsumerGroupsResult = this.clientManager().getAdmin().listConsumerGroups();
                    this.scheduleWhenComplete(listConsumerGroupsResult.all(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                        scala.collection.immutable.Set<String> filteredGroups = this.filterConsumerGroups((scala.collection.immutable.Set<String>)((TraversableOnce)((TraversableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)listConsumerGroupsResult.all().get()).asScala()).map((Function1 & Serializable & scala.Serializable)result -> result.groupId(), Iterable$.MODULE$.canBuildFrom())).toSet());
                        return this.listSourceConsumerGroupOffsets(filteredGroups);
                    });
                    this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
                }
                catch (Throwable ex) {
                    this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Unable to list consumer groups on source cluster. Offsets will not be migrated.", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
                }
            }
        }
        return this.tasksOutstanding() == 0;
    }

    /*
     * WARNING - void declaration
     */
    private scala.collection.immutable.Set<String> filterConsumerGroups(scala.collection.immutable.Set<String> groups) {
        void var4_4;
        void var3_3;
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Filtering consumer groups ").append(groups).append(" to match consumer group JSON").toString());
        Tuple2<scala.collection.immutable.Set<String>, Seq<ClusterLinkFilterInfo>> tuple2 = ClusterLinkUtils$.MODULE$.doFilter(groups, this.config().consumerGroupFilters(), this.linkData.tenantPrefix());
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        scala.collection.immutable.Set filtered = (scala.collection.immutable.Set)tuple2._1();
        Seq unusedFilters = (Seq)tuple2._2();
        void filtered2 = var3_3;
        var4_4.foreach((Function1 & Serializable & scala.Serializable)unusedFilter -> {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(199).append("The filter ").append(unusedFilter).append(" does not match any consumer group. This filter may not be ").append("required or the groups it referred to may not have the correct DESCRIBE ACL ").append("for the cluster link principal on the source cluster.").toString());
            return BoxedUnit.UNIT;
        });
        this.trace((Function0<String>)((Function0 & Serializable & scala.Serializable)() -> ClusterLinkSyncOffsets.$anonfun$filterConsumerGroups$4((scala.collection.immutable.Set)filtered2)));
        return filtered2;
    }

    private boolean listSourceConsumerGroupOffsets(scala.collection.immutable.Set<String> groups) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        Set groupsToRemove = this.currentOffsets().keySet().diff(groups);
        if (groupsToRemove.nonEmpty()) {
            this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Removing consumer groups ").append(groupsToRemove).append(" from current offsets cache.").toString());
        }
        this.currentOffsets().$minus$minus$eq((TraversableOnce)groupsToRemove);
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(70).append("Updating offsets for the following consumer groups on target cluster: ").append(groups.toString()).toString());
        try {
            if (this.useBatch()) {
                groups.grouped(ClusterLinkSyncOffsets$.MODULE$.ListConsumerGroupOffsetBatchSize()).foreach(arg_0 -> ClusterLinkSyncOffsets.$anonfun$listSourceConsumerGroupOffsets$3$adapted(this, null, arg_0));
            } else {
                scala.collection.mutable.Map groupFutures = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
                groups.foreach((Function1 & Serializable & scala.Serializable)g -> {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(68).append("Listing consumer group offsets on source cluster for consumer group ").append((String)g).toString());
                    return (scala.collection.mutable.Map)groupFutures.$plus$plus$eq((TraversableOnce)Option$.MODULE$.option2Iterable((Option)new Some((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(g), (Object)this.clientManager().getAdmin().listConsumerGroupOffsets(g).partitionsToOffsetAndMetadata()))));
                });
                KafkaFuture allGroupFutures = KafkaFuture.allOf((KafkaFuture[])((KafkaFuture[])groupFutures.values().toSeq().toArray(ClassTag$.MODULE$.apply(KafkaFuture.class))));
                this.scheduleWhenComplete(allGroupFutures, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.doAsyncCommit((scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>>)groupFutures.toMap(Predef$.MODULE$.$conforms())));
                this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
            }
        }
        catch (ExecutionException executionException) {
            Throwable throwable = executionException.getCause();
            if (throwable instanceof GroupAuthorizationException) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(155).append("Unable to list offsets for one or more consumer groups on the source cluster, ").append("due to authorization issues. Please add DESCRIBE ACLs for the consumer group.").toString());
            }
            if (throwable instanceof TopicAuthorizationException) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(162).append("Unable to list offsets for one or more consumer groups on the source cluster, ").append("due to authorization issues. Please add DESCRIBE ACLs for the topics being mirrored.").toString());
            }
            if (throwable != null) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(79).append("Unable to list consumer groups on source cluster. Offsets will not be ").append("migrated.").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> throwable);
            }
            throw new MatchError(null);
        }
        catch (Throwable ex) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(79).append("Unable to list consumer groups on source cluster. Offsets will not be ").append("migrated.").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean doAsyncCommit(scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> groupFutures) {
        scala.collection.immutable.Map<String, AlterConsumerGroupOffsetsResult> commitResults = this.asyncCommitConsumerOffsetsToDestinationCluster(groupFutures);
        if (commitResults.nonEmpty()) {
            this.handleDestinationClusterCommittedOffsets(commitResults, groupFutures);
        } else if (this.tasksOutstanding() > 0) {
            this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean handleDestinationClusterCommittedOffsets(scala.collection.immutable.Map<String, AlterConsumerGroupOffsetsResult> commitResults, scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> listFutures) {
        KafkaFuture allCommitFutures = KafkaFuture.allOf((KafkaFuture[])((KafkaFuture[])((TraversableOnce)commitResults.values().map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.all(), Iterable$.MODULE$.canBuildFrom())).toSeq().toArray(ClassTag$.MODULE$.apply(KafkaFuture.class))));
        this.scheduleWhenComplete(allCommitFutures, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
            this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
            commitResults.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                ClusterLinkSyncOffsets.$anonfun$handleDestinationClusterCommittedOffsets$3(this, listFutures, x0$1);
                return BoxedUnit.UNIT;
            });
            return this.tasksOutstanding() == 0;
        });
        return this.tasksOutstanding() == 0;
    }

    private scala.collection.immutable.Map<String, AlterConsumerGroupOffsetsResult> asyncCommitConsumerOffsetsToDestinationCluster(scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> groupFutures) {
        return (scala.collection.immutable.Map)groupFutures.flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Iterable iterable;
            if (x0$1 != null) {
                java.util.Map offsets;
                java.util.Map map;
                String group = (String)x0$1._1();
                KafkaFuture groupFuture = (KafkaFuture)x0$1._2();
                try {
                    map = (java.util.Map)CollectionConverters$.MODULE$.mutableMapAsJavaMapConverter((scala.collection.mutable.Map)((TraversableLike)CollectionConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)groupFuture.get()).asScala()).filter((Function1 & Serializable & scala.Serializable)t -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkSyncOffsets.$anonfun$asyncCommitConsumerOffsetsToDestinationCluster$2(this, group, t)))).asJava();
                }
                catch (ExecutionException executionException) {
                    Throwable throwable = executionException.getCause();
                    if (throwable instanceof OffsetFetchRequest.NoBatchedOffsetFetchRequestException) {
                        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(153).append("Source cluster does not support batched offsetFetch requests for link ").append($this.linkData.linkName()).append(".").append(" Will use non-batched version of offsetFetch for consumer offset sync on next try.").toString());
                        this.useBatch_$eq(false);
                    } else if (throwable != null) {
                        this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(79).append("Unable to list consumer groups on source cluster. Offsets will not be ").append("migrated.").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> throwable);
                    } else {
                        throw new MatchError(null);
                    }
                    map = offsets = new HashMap();
                }
                if (!offsets.isEmpty()) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(56).append("Committing offsets on target cluster for consumer group ").append(group).toString());
                    iterable = Option$.MODULE$.option2Iterable((Option)new Some((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)group), (Object)((Admin)this.destAdminFactory().apply()).alterConsumerGroupOffsets(group, offsets))));
                } else {
                    iterable = Option$.MODULE$.option2Iterable((Option)None$.MODULE$);
                }
            } else {
                throw new MatchError(null);
            }
            Iterable iterable2 = iterable;
            return iterable2;
        }, scala.collection.immutable.Map$.MODULE$.canBuildFrom());
    }

    private boolean shouldSync(String topic) {
        return this.controller.controllerContext().linkedTopics().get((Object)topic).exists((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkSyncOffsets.$anonfun$shouldSync$1(x$3)));
    }

    public static final /* synthetic */ String $anonfun$filterConsumerGroups$4(scala.collection.immutable.Set filtered$1) {
        return new StringBuilder(26).append("Filtered consumer groups: ").append(filtered$1).toString();
    }

    public static final /* synthetic */ void $anonfun$listSourceConsumerGroupOffsets$3(ClusterLinkSyncOffsets $this, List nullList$1, scala.collection.immutable.Set batch) {
        $this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(80).append("Listing consumer group offsets on source cluster for following consumer groups: ").append(batch.toString()).toString());
        java.util.Map groupMap = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)((TraversableOnce)batch.map((Function1 & Serializable & scala.Serializable)g -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(g), (Object)nullList$1), Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava();
        ListConsumerGroupOffsetsResult result = $this.clientManager().getClusterLinkAdminClient().listConsumerGroupOffsets(groupMap, new ListConsumerGroupOffsetsOptions());
        scala.collection.immutable.Map groupFutures = ((TraversableOnce)CollectionConverters$.MODULE$.mapAsScalaMapConverter(result.groupIdsToPartitionsAndOffsetAndMetadata()).asScala()).toMap(Predef$.MODULE$.$conforms());
        $this.scheduleWhenComplete(result.all(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> $this.doAsyncCommit((scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>>)groupFutures));
        $this.tasksOutstanding_$eq($this.tasksOutstanding() + 1);
    }

    public static final /* synthetic */ void $anonfun$handleDestinationClusterCommittedOffsets$3(ClusterLinkSyncOffsets $this, scala.collection.immutable.Map listFutures$1, Tuple2 x0$1) {
        if (x0$1 != null) {
            String group = (String)x0$1._1();
            try {
                ((AlterConsumerGroupOffsetsResult)x0$1._2()).all().get();
                ((java.util.Map)((KafkaFuture)listFutures$1.apply((Object)group)).get()).forEach((tp, offsetAndMetadata) -> {
                    if ($this.shouldSync(tp.topic())) {
                        $this.currentOffsets().$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)group), (Object)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp), (Object)BoxesRunTime.boxToLong((long)offsetAndMetadata.offset()))}))));
                        $this.metrics.consumerOffsetCommitSensor().record();
                        $this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Committed offsets on target cluster for consumer group ").append(group).toString());
                    }
                });
                return;
            }
            catch (ExecutionException executionException) {
                Throwable throwable = executionException.getCause();
                if (throwable instanceof GroupAuthorizationException) {
                    $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(277).append("Unable to commit offsets for consumer group ").append(group).append(" on the destination cluster, due to authorization issues.").append(" Please add READ ACLs for the consumer group. This action is taken by the inter-broker principal defined in the broker ").append("configuration so ACLs should be added for this principal.").toString());
                    $this.metrics.consumerOffsetCommitFailedSensor().record();
                    return;
                }
                if (throwable instanceof TopicAuthorizationException) {
                    $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(284).append("Unable to commit offsets for consumer group ").append(group).append(" on the destination cluster, due to authorization issues.").append(" Please add READ ACLs for the topics being migrated. This action is taken by the inter-broker principal defined in the broker ").append("configuration so ACLs should be added for this principal.").toString());
                    $this.metrics.consumerOffsetCommitFailedSensor().record();
                    return;
                }
                if (throwable != null) {
                    $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(68).append("Unable to commit offsets for consumer group ").append(group).append(" on destination cluster.").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> throwable);
                    $this.metrics.consumerOffsetCommitFailedSensor().record();
                    return;
                }
                throw new MatchError(null);
            }
            catch (Throwable ex) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(68).append("Unable to commit offsets for consumer group ").append(group).append(" on destination cluster.").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
                $this.metrics.consumerOffsetCommitFailedSensor().record();
                return;
            }
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ boolean $anonfun$asyncCommitConsumerOffsetsToDestinationCluster$2(ClusterLinkSyncOffsets $this, String group$2, Tuple2 t) {
        if ($this.shouldSync(((TopicPartition)t._1()).topic())) {
            Object object = $this.currentOffsets().getOrElse((Object)group$2, (Function0 & Serializable & scala.Serializable)() -> Predef$.MODULE$.Map().empty());
            GenMap genMap = Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t._1()), (Object)BoxesRunTime.boxToLong((long)((OffsetAndMetadata)t._2()).offset()))}));
            if (object == null ? genMap != null : !object.equals(genMap)) {
                return true;
            }
        }
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$shouldSync$1(ClusterLinkTopicState x$3) {
        return x$3.state().shouldSync();
    }

    public ClusterLinkSyncOffsets(ClusterLinkClientManager clientManager, ClusterLinkData linkData, KafkaController controller, Function0<Admin> destAdminFactory, ClusterLinkMetrics metrics) {
        this.clientManager = clientManager;
        this.linkData = linkData;
        this.controller = controller;
        this.destAdminFactory = destAdminFactory;
        this.metrics = metrics;
        super(clientManager.scheduler(), "ClusterLinkSyncOffsets", Predef$.MODULE$.Integer2int(clientManager.currentConfig().consumerOffsetSyncMs()));
        this.currentOffsets = Map$.MODULE$.empty();
        this.config = clientManager.currentConfig();
        this.tasksOutstanding = 0;
        this.useBatch = true;
    }

    public static final /* synthetic */ Object $anonfun$listSourceConsumerGroupOffsets$3$adapted(ClusterLinkSyncOffsets $this, List nullList$1, scala.collection.immutable.Set batch) {
        ClusterLinkSyncOffsets.$anonfun$listSourceConsumerGroupOffsets$3($this, nullList$1, batch);
        return BoxedUnit.UNIT;
    }
}

