/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

public class StreamsPartitionAssignor
implements ConsumerPartitionAssignor,
Configurable {
    private Logger log;
    private String logPrefix;
    private static final UUID FUTURE_ID = UUID.randomUUID();
    protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition);
    private String userEndPoint;
    private AssignorConfiguration.AssignmentConfigs assignmentConfigs;
    private TaskManager taskManager;
    private StreamsMetadataState streamsMetadataState;
    private PartitionGrouper partitionGrouper;
    private AtomicInteger assignmentErrorCode;
    private AtomicLong nextScheduledRebalanceMs;
    private Time time;
    protected int usedSubscriptionMetadataVersion = 8;
    private Admin adminClient;
    private InternalTopicManager internalTopicManager;
    private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
    private ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol;
    private AssignorConfiguration.AssignmentListener assignmentListener;
    private Supplier<TaskAssignor> taskAssignorSupplier;
    private byte uniqueField;

    public void configure(Map<String, ?> configs) {
        AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
        this.logPrefix = assignorConfiguration.logPrefix();
        this.log = new LogContext(this.logPrefix).logger(this.getClass());
        this.usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(this.usedSubscriptionMetadataVersion);
        this.taskManager = assignorConfiguration.taskManager();
        this.streamsMetadataState = assignorConfiguration.streamsMetadataState();
        this.assignmentErrorCode = assignorConfiguration.assignmentErrorCode();
        this.nextScheduledRebalanceMs = assignorConfiguration.nextScheduledRebalanceMs();
        this.time = assignorConfiguration.time();
        this.assignmentConfigs = assignorConfiguration.assignmentConfigs();
        this.partitionGrouper = assignorConfiguration.partitionGrouper();
        this.userEndPoint = assignorConfiguration.userEndPoint();
        this.adminClient = assignorConfiguration.adminClient();
        this.internalTopicManager = assignorConfiguration.internalTopicManager();
        this.copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
        this.rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
        this.taskAssignorSupplier = assignorConfiguration::taskAssignor;
        this.assignmentListener = assignorConfiguration.assignmentListener();
        this.uniqueField = 0;
    }

    public String name() {
        return "stream";
    }

    public List<ConsumerPartitionAssignor.RebalanceProtocol> supportedProtocols() {
        ArrayList<ConsumerPartitionAssignor.RebalanceProtocol> supportedProtocols = new ArrayList<ConsumerPartitionAssignor.RebalanceProtocol>();
        supportedProtocols.add(ConsumerPartitionAssignor.RebalanceProtocol.EAGER);
        if (this.rebalanceProtocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            supportedProtocols.add(this.rebalanceProtocol);
        }
        return supportedProtocols;
    }

    public ByteBuffer subscriptionUserData(Set<String> topics) {
        this.handleRebalanceStart(topics);
        this.uniqueField = (byte)(this.uniqueField + 1);
        return new SubscriptionInfo(this.usedSubscriptionMetadataVersion, 8, this.taskManager.processId(), this.userEndPoint, this.taskManager.getTaskOffsetSums(), this.uniqueField).encode();
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> errorAssignment(Map<UUID, ClientMetadata> clientsMetadata, int errorCode) {
        HashMap<String, ConsumerPartitionAssignor.Assignment> assignment = new HashMap<String, ConsumerPartitionAssignor.Assignment>();
        for (ClientMetadata clientMetadata : clientsMetadata.values()) {
            for (String consumerId : clientMetadata.consumers) {
                assignment.put(consumerId, new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), new AssignmentInfo(8, Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), errorCode).encode()));
            }
        }
        return assignment;
    }

    /*
     * WARNING - void declaration
     */
    public ConsumerPartitionAssignor.GroupAssignment assign(Cluster metadata, ConsumerPartitionAssignor.GroupSubscription groupSubscription) {
        Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
        Map subscriptions = groupSubscription.groupSubscription();
        HashMap<UUID, ClientMetadata> clientMetadataMap = new HashMap<UUID, ClientMetadata>();
        HashSet<TopicPartition> allOwnedPartitions = new HashSet<TopicPartition>();
        int minReceivedMetadataVersion = 8;
        int minSupportedMetadataVersion = 8;
        int futureMetadataVersion = -1;
        for (Map.Entry entry : subscriptions.entrySet()) {
            void var16_19;
            ClientMetadata clientMetadata;
            Object processId;
            String consumerId = (String)entry.getKey();
            ConsumerPartitionAssignor.Subscription subscription = (ConsumerPartitionAssignor.Subscription)entry.getValue();
            SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
            int usedVersion = info.version();
            minReceivedMetadataVersion = this.updateMinReceivedVersion(usedVersion, minReceivedMetadataVersion);
            minSupportedMetadataVersion = this.updateMinSupportedVersion(info.latestSupportedVersion(), minSupportedMetadataVersion);
            if (usedVersion > 8) {
                futureMetadataVersion = usedVersion;
                processId = FUTURE_ID;
                if (!clientMetadataMap.containsKey(FUTURE_ID)) {
                    clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null));
                }
            } else {
                processId = info.processId();
            }
            if ((clientMetadata = (ClientMetadata)clientMetadataMap.get(processId)) == null) {
                ClientMetadata clientMetadata2 = new ClientMetadata(info.userEndPoint());
                clientMetadataMap.put(info.processId(), clientMetadata2);
            }
            var16_19.addConsumer(consumerId, subscription.ownedPartitions());
            allOwnedPartitions.addAll(subscription.ownedPartitions());
            var16_19.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
        }
        boolean versionProbing = this.checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
        this.log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.taskManager.builder().topicGroups();
        try {
            allRepartitionTopicPartitions = this.prepareRepartitionTopics(topicGroups, metadata);
        }
        catch (TaskAssignmentException e) {
            return new ConsumerPartitionAssignor.GroupAssignment(this.errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
        }
        Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
        this.log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
        HashSet<String> allSourceTopics = new HashSet<String>();
        HashMap<Integer, Set<String>> sourceTopicsByGroup = new HashMap<Integer, Set<String>>();
        for (Map.Entry entry : topicGroups.entrySet()) {
            allSourceTopics.addAll(((InternalTopologyBuilder.TopicsInfo)entry.getValue()).sourceTopics);
            sourceTopicsByGroup.put((Integer)entry.getKey(), ((InternalTopologyBuilder.TopicsInfo)entry.getValue()).sourceTopics);
        }
        Map<TaskId, Set<TopicPartition>> partitionsForTask = this.partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
        HashSet<TaskId> hashSet = new HashSet<TaskId>();
        boolean probingRebalanceNeeded = this.assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, hashSet);
        HashMap<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<HostInfo, Set<TopicPartition>>();
        HashMap<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new HashMap<HostInfo, Set<TopicPartition>>();
        if (minReceivedMetadataVersion >= 2) {
            this.populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
        }
        this.streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);
        Map<String, ConsumerPartitionAssignor.Assignment> assignment = this.computeNewAssignment(hashSet, clientMetadataMap, partitionsForTask, partitionsByHost, standbyPartitionsByHost, allOwnedPartitions, minReceivedMetadataVersion, minSupportedMetadataVersion, versionProbing, probingRebalanceNeeded);
        return new ConsumerPartitionAssignor.GroupAssignment(assignment);
    }

    private boolean checkMetadataVersions(int minReceivedMetadataVersion, int minSupportedMetadataVersion, int futureMetadataVersion) {
        boolean versionProbing;
        if (futureMetadataVersion == -1) {
            versionProbing = false;
        } else if (minReceivedMetadataVersion >= 3) {
            versionProbing = true;
            this.log.info("Received a future (version probing) subscription (version: {}). Sending assignment back (with supported version {}).", (Object)futureMetadataVersion, (Object)minSupportedMetadataVersion);
        } else {
            throw new IllegalStateException("Received a future (version probing) subscription (version: " + futureMetadataVersion + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion + ") at the same time.");
        }
        if (minReceivedMetadataVersion < 8) {
            this.log.info("Downgrade metadata to version {}. Latest supported version is {}.", (Object)minReceivedMetadataVersion, (Object)8);
        }
        if (minSupportedMetadataVersion < 8) {
            this.log.info("Downgrade latest supported metadata to version {}. Latest supported version is {}.", (Object)minSupportedMetadataVersion, (Object)8);
        }
        return versionProbing;
    }

    private Map<String, InternalTopicConfig> computeRepartitionTopicMetadata(Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups, Cluster metadata) throws TaskAssignmentException {
        HashMap<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<String, InternalTopicConfig>();
        for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
            for (String string : topicsInfo.sourceTopics) {
                if (topicsInfo.repartitionSourceTopics.containsKey(string) || metadata.topics().contains(string)) continue;
                this.log.error("Source topic {} is missing/unknown during rebalance, please make sure all source topics have been pre-created before starting the Streams application. Returning error {}", (Object)string, (Object)AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
                throw new TaskAssignmentException("Missing source topic during assignment.");
            }
            for (InternalTopicConfig internalTopicConfig : topicsInfo.repartitionSourceTopics.values()) {
                repartitionTopicMetadata.put(internalTopicConfig.name(), internalTopicConfig);
            }
        }
        return repartitionTopicMetadata;
    }

    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups, Cluster metadata) {
        Map<String, InternalTopicConfig> repartitionTopicMetadata = this.computeRepartitionTopicMetadata(topicGroups, metadata);
        this.setRepartitionTopicMetadataNumberOfPartitions(repartitionTopicMetadata, topicGroups, metadata);
        this.ensureCopartitioning(this.taskManager.builder().copartitionGroups(), repartitionTopicMetadata, metadata);
        this.internalTopicManager.makeReady(repartitionTopicMetadata);
        HashMap<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<TopicPartition, PartitionInfo>();
        for (Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet()) {
            String topic = entry.getKey();
            int numPartitions = entry.getValue().numberOfPartitions().orElse(-1);
            for (int partition = 0; partition < numPartitions; ++partition) {
                allRepartitionTopicPartitions.put(new TopicPartition(topic, partition), new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
            }
        }
        return allRepartitionTopicPartitions;
    }

    private void setRepartitionTopicMetadataNumberOfPartitions(Map<String, InternalTopicConfig> repartitionTopicMetadata, Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups, Cluster metadata) {
        boolean numPartitionsNeeded;
        do {
            numPartitionsNeeded = false;
            boolean progressMadeThisIteration = false;
            for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
                for (String repartitionSourceTopic : topicsInfo.repartitionSourceTopics.keySet()) {
                    Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(repartitionSourceTopic).numberOfPartitions();
                    Integer numPartitions = null;
                    if (maybeNumPartitions.isPresent()) continue;
                    for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
                        Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
                        if (!otherSinkTopics.contains(repartitionSourceTopic)) continue;
                        for (String upstreamSourceTopic : otherTopicsInfo.sourceTopics) {
                            Integer numPartitionsCandidate = null;
                            if (repartitionTopicMetadata.containsKey(upstreamSourceTopic)) {
                                if (repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent()) {
                                    numPartitionsCandidate = repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get();
                                }
                            } else {
                                Integer count = metadata.partitionCountForTopic(upstreamSourceTopic);
                                if (count == null) {
                                    throw new IllegalStateException("No partition count found for source topic " + upstreamSourceTopic + ", but it should have been.");
                                }
                                numPartitionsCandidate = count;
                            }
                            if (numPartitionsCandidate == null || numPartitions != null && numPartitionsCandidate <= numPartitions) continue;
                            numPartitions = numPartitionsCandidate;
                        }
                    }
                    if (numPartitions == null) {
                        numPartitionsNeeded = true;
                        this.log.trace("Unable to determine number of partitions for {}, another iteration is needed", (Object)repartitionSourceTopic);
                        continue;
                    }
                    repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
                    progressMadeThisIteration = true;
                }
            }
            if (progressMadeThisIteration || !numPartitionsNeeded) continue;
            throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics");
        } while (numPartitionsNeeded);
    }

    private void populateTasksForMaps(Map<TopicPartition, TaskId> taskForPartition, Map<Integer, Set<TaskId>> tasksForTopicGroup, Set<String> allSourceTopics, Map<TaskId, Set<TopicPartition>> partitionsForTask, Cluster fullMetadata) {
        HashSet<TopicPartition> allAssignedPartitions = new HashSet<TopicPartition>();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
            TaskId id = entry.getKey();
            Set<TopicPartition> partitions = entry.getValue();
            for (TopicPartition partition : partitions) {
                taskForPartition.put(partition, id);
                if (!allAssignedPartitions.contains(partition)) continue;
                this.log.warn("Partition {} is assigned to more than one tasks: {}", (Object)partition, partitionsForTask);
            }
            allAssignedPartitions.addAll(partitions);
            tasksForTopicGroup.computeIfAbsent(id.topicGroupId, k -> new HashSet()).add(id);
        }
        this.checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata);
    }

    private void checkAllPartitions(Set<String> allSourceTopics, Map<TaskId, Set<TopicPartition>> partitionsForTask, Set<TopicPartition> allAssignedPartitions, Cluster fullMetadata) {
        for (String topic : allSourceTopics) {
            List partitionInfoList = fullMetadata.partitionsForTopic(topic);
            if (partitionInfoList.isEmpty()) {
                this.log.warn("No partitions found for topic {}", (Object)topic);
                continue;
            }
            for (PartitionInfo partitionInfo : partitionInfoList) {
                TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                if (allAssignedPartitions.contains(partition)) continue;
                this.log.warn("Partition {} is not assigned to any tasks: {} Possible causes of a partition not getting assigned is that another topic defined in the topology has not been created when starting your streams application, resulting in no tasks created for this topology at all.", (Object)partition, partitionsForTask);
            }
        }
    }

    private Set<String> prepareChangelogTopics(Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups, Map<Integer, Set<TaskId>> tasksForTopicGroup, Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask, Set<String> optimizedSourceChangelogs) {
        HashMap<String, InternalTopicConfig> changelogTopicMetadata = new HashMap<String, InternalTopicConfig>();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            int topicGroupId = entry.getKey();
            InternalTopologyBuilder.TopicsInfo topicsInfo = entry.getValue();
            Set<TaskId> topicGroupTasks = tasksForTopicGroup.get(topicGroupId);
            if (topicGroupTasks == null) {
                this.log.debug("No tasks found for topic group {}", (Object)topicGroupId);
                continue;
            }
            if (topicsInfo.stateChangelogTopics.isEmpty()) continue;
            for (TaskId task : topicGroupTasks) {
                changelogsByStatefulTask.put(task, topicsInfo.stateChangelogTopics.keySet().stream().map(topic -> new TopicPartition(topic, task.partition)).collect(Collectors.toSet()));
            }
            for (InternalTopicConfig topicConfig : topicsInfo.nonSourceChangelogTopics()) {
                int numPartitions = -1;
                for (TaskId task : topicGroupTasks) {
                    if (numPartitions >= task.partition + 1) continue;
                    numPartitions = task.partition + 1;
                }
                topicConfig.setNumberOfPartitions(numPartitions);
                changelogTopicMetadata.put(topicConfig.name(), topicConfig);
            }
            optimizedSourceChangelogs.addAll(topicsInfo.sourceTopicChangelogs());
        }
        Set<String> newlyCreatedTopics = this.internalTopicManager.makeReady(changelogTopicMetadata);
        this.log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values());
        return newlyCreatedTopics;
    }

    private boolean assignTasksToClients(Cluster fullMetadata, Set<String> allSourceTopics, Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups, Map<UUID, ClientMetadata> clientMetadataMap, Map<TaskId, Set<TopicPartition>> partitionsForTask, Set<TaskId> statefulTasks) {
        if (!statefulTasks.isEmpty()) {
            throw new IllegalArgumentException("The stateful tasks should not be populated before assigning tasks to clients");
        }
        HashMap<TopicPartition, TaskId> taskForPartition = new HashMap<TopicPartition, TaskId>();
        HashMap<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<Integer, Set<TaskId>>();
        this.populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata);
        HashMap<TaskId, Set<TopicPartition>> changelogsByStatefulTask = new HashMap<TaskId, Set<TopicPartition>>();
        HashSet<String> optimizedSourceChangelogs = new HashSet<String>();
        Set<String> newlyCreatedChangelogs = this.prepareChangelogTopics(topicGroups, tasksForTopicGroup, changelogsByStatefulTask, optimizedSourceChangelogs);
        HashMap<UUID, ClientState> clientStates = new HashMap<UUID, ClientState>();
        boolean lagComputationSuccessful = this.populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogsByStatefulTask, newlyCreatedChangelogs, optimizedSourceChangelogs);
        Set<TaskId> allTasks = partitionsForTask.keySet();
        statefulTasks.addAll(changelogsByStatefulTask.keySet());
        this.log.debug("Assigning tasks {} to clients {} with number of replicas {}", new Object[]{allTasks, clientStates, this.numStandbyReplicas()});
        TaskAssignor taskAssignor = this.createTaskAssignor(lagComputationSuccessful);
        boolean probingRebalanceNeeded = taskAssignor.assign(clientStates, allTasks, statefulTasks, this.assignmentConfigs);
        this.log.info("Assigned tasks to clients as {}{}.", (Object)Utils.NL, (Object)clientStates.entrySet().stream().map(Object::toString).collect(Collectors.joining(Utils.NL)));
        return probingRebalanceNeeded;
    }

    private TaskAssignor createTaskAssignor(boolean lagComputationSuccessful) {
        TaskAssignor taskAssignor = this.taskAssignorSupplier.get();
        if (taskAssignor instanceof StickyTaskAssignor) {
            return taskAssignor;
        }
        if (lagComputationSuccessful) {
            return taskAssignor;
        }
        this.log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and trigger another rebalance to retry.");
        return new FallbackPriorTaskAssignor();
    }

    private boolean populateClientStatesMap(Map<UUID, ClientState> clientStates, Map<UUID, ClientMetadata> clientMetadataMap, Map<TopicPartition, TaskId> taskForPartition, Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask, Set<String> newlyCreatedChangelogs, Set<String> optimizedSourceChangelogs) {
        boolean fetchEndOffsetsSuccessful;
        Map<TaskId, Long> allTaskEndOffsetSums;
        try {
            Collection allChangelogPartitions = changelogsByStatefulTask.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
            HashSet<TopicPartition> preexistingChangelogPartitions = new HashSet<TopicPartition>();
            HashSet<TopicPartition> preexistingSourceChangelogPartitions = new HashSet<TopicPartition>();
            HashSet<TopicPartition> newlyCreatedChangelogPartitions = new HashSet<TopicPartition>();
            for (TopicPartition changelog : allChangelogPartitions) {
                if (newlyCreatedChangelogs.contains(changelog.topic())) {
                    newlyCreatedChangelogPartitions.add(changelog);
                    continue;
                }
                if (optimizedSourceChangelogs.contains(changelog.topic())) {
                    preexistingSourceChangelogPartitions.add(changelog);
                    continue;
                }
                preexistingChangelogPartitions.add(changelog);
            }
            KafkaFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> endOffsetsFuture = ClientUtils.fetchEndOffsetsFuture(preexistingChangelogPartitions, this.adminClient);
            Map<TopicPartition, Long> sourceChangelogEndOffsets = ClientUtils.fetchCommittedOffsets(preexistingSourceChangelogPartitions, this.taskManager.mainConsumer());
            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture);
            allTaskEndOffsetSums = this.computeEndOffsetSumsByTask(changelogsByStatefulTask, endOffsets, sourceChangelogEndOffsets, newlyCreatedChangelogPartitions);
            fetchEndOffsetsSuccessful = true;
        }
        catch (TimeoutException | StreamsException e) {
            allTaskEndOffsetSums = changelogsByStatefulTask.keySet().stream().collect(Collectors.toMap(t -> t, t -> -3L));
            fetchEndOffsetsSuccessful = false;
        }
        for (Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) {
            UUID uuid = entry.getKey();
            ClientState state = entry.getValue().state;
            state.initializePrevTasks(taskForPartition);
            state.computeTaskLags(uuid, allTaskEndOffsetSums);
            clientStates.put(uuid, state);
        }
        return fetchEndOffsetsSuccessful;
    }

    private Map<TaskId, Long> computeEndOffsetSumsByTask(Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask, Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets, Map<TopicPartition, Long> sourceChangelogEndOffsets, Collection<TopicPartition> newlyCreatedChangelogPartitions) {
        HashMap<TaskId, Long> taskEndOffsetSums = new HashMap<TaskId, Long>();
        block0: for (Map.Entry<TaskId, Set<TopicPartition>> taskEntry : changelogsByStatefulTask.entrySet()) {
            TaskId task = taskEntry.getKey();
            Set<TopicPartition> changelogs = taskEntry.getValue();
            taskEndOffsetSums.put(task, 0L);
            for (TopicPartition changelog : changelogs) {
                long changelogEndOffset;
                if (newlyCreatedChangelogPartitions.contains(changelog)) {
                    changelogEndOffset = 0L;
                } else if (sourceChangelogEndOffsets.containsKey(changelog)) {
                    changelogEndOffset = sourceChangelogEndOffsets.get(changelog);
                } else if (endOffsets.containsKey(changelog)) {
                    changelogEndOffset = endOffsets.get(changelog).offset();
                } else {
                    this.log.debug("Fetched offsets did not contain the changelog {} of task {}", (Object)changelog, (Object)task);
                    throw new IllegalStateException("Could not get end offset for " + changelog);
                }
                long newEndOffsetSum = (Long)taskEndOffsetSums.get(task) + changelogEndOffset;
                if (newEndOffsetSum < 0L) {
                    taskEndOffsetSums.put(task, Long.MAX_VALUE);
                    continue block0;
                }
                taskEndOffsetSums.put(task, newEndOffsetSum);
            }
        }
        return taskEndOffsetSums;
    }

    private void populatePartitionsByHostMaps(Map<HostInfo, Set<TopicPartition>> partitionsByHost, Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost, Map<TaskId, Set<TopicPartition>> partitionsForTask, Map<UUID, ClientMetadata> clientMetadataMap) {
        for (Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) {
            HostInfo hostInfo = entry.getValue().hostInfo;
            if (hostInfo == null) continue;
            HashSet topicPartitions = new HashSet();
            HashSet standbyPartitions = new HashSet();
            ClientState state = entry.getValue().state;
            for (TaskId id : state.activeTasks()) {
                topicPartitions.addAll(partitionsForTask.get(id));
            }
            for (TaskId id : state.standbyTasks()) {
                standbyPartitions.addAll(partitionsForTask.get(id));
            }
            partitionsByHost.put(hostInfo, topicPartitions);
            standbyPartitionsByHost.put(hostInfo, standbyPartitions);
        }
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> computeNewAssignment(Set<TaskId> statefulTasks, Map<UUID, ClientMetadata> clientsMetadata, Map<TaskId, Set<TopicPartition>> partitionsForTask, Map<HostInfo, Set<TopicPartition>> partitionsByHostState, Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost, Set<TopicPartition> allOwnedPartitions, int minUserMetadataVersion, int minSupportedMetadataVersion, boolean versionProbing, boolean shouldTriggerProbingRebalance) {
        boolean rebalanceRequired = shouldTriggerProbingRebalance || versionProbing;
        HashMap<String, ConsumerPartitionAssignor.Assignment> assignment = new HashMap<String, ConsumerPartitionAssignor.Assignment>();
        for (Map.Entry<UUID, ClientMetadata> clientEntry : clientsMetadata.entrySet()) {
            boolean encodeNextProbingRebalanceTime;
            Map<String, List<TaskId>> standbyTaskAssignment;
            UUID clientId = clientEntry.getKey();
            ClientMetadata clientMetadata = clientEntry.getValue();
            ClientState state = clientMetadata.state;
            SortedSet consumers = clientMetadata.consumers;
            Map<String, List<TaskId>> activeTaskAssignment = StreamsPartitionAssignor.assignTasksToThreads(state.statefulActiveTasks(), state.statelessActiveTasks(), consumers, state);
            boolean tasksRevoked = this.addClientAssignments(statefulTasks, assignment, clientMetadata, partitionsForTask, partitionsByHostState, standbyPartitionsByHost, allOwnedPartitions, activeTaskAssignment, standbyTaskAssignment = StreamsPartitionAssignor.assignTasksToThreads(state.standbyTasks(), Collections.emptySet(), consumers, state), minUserMetadataVersion, minSupportedMetadataVersion, encodeNextProbingRebalanceTime = shouldTriggerProbingRebalance && clientId.equals(this.taskManager.processId()));
            if (!tasksRevoked && !encodeNextProbingRebalanceTime) continue;
            rebalanceRequired = true;
            this.log.debug("Requested client {} to schedule a followup rebalance", (Object)clientId);
        }
        if (rebalanceRequired) {
            this.assignmentListener.onAssignmentComplete(false);
            this.log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled.");
        } else {
            this.assignmentListener.onAssignmentComplete(true);
            this.log.info("Finished stable assignment of tasks, no followup rebalances required.");
        }
        return assignment;
    }

    private boolean addClientAssignments(Set<TaskId> statefulTasks, Map<String, ConsumerPartitionAssignor.Assignment> assignment, ClientMetadata clientMetadata, Map<TaskId, Set<TopicPartition>> partitionsForTask, Map<HostInfo, Set<TopicPartition>> partitionsByHostState, Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost, Set<TopicPartition> allOwnedPartitions, Map<String, List<TaskId>> activeTaskAssignments, Map<String, List<TaskId>> standbyTaskAssignments, int minUserMetadataVersion, int minSupportedMetadataVersion, boolean probingRebalanceNeeded) {
        boolean followupRebalanceRequiredForRevokedTasks = false;
        boolean shouldEncodeProbingRebalance = probingRebalanceNeeded;
        for (String consumer : clientMetadata.consumers) {
            List<TaskId> activeTasksForConsumer = activeTaskAssignments.get(consumer);
            ArrayList<TopicPartition> activePartitionsList = new ArrayList<TopicPartition>();
            ArrayList<TaskId> assignedActiveList = new ArrayList<TaskId>();
            Set<TaskId> activeTasksRemovedPendingRevokation = this.populateActiveTaskAndPartitionsLists(activePartitionsList, assignedActiveList, consumer, clientMetadata.state, activeTasksForConsumer, partitionsForTask, allOwnedPartitions);
            Map<TaskId, Set<TopicPartition>> standbyTaskMap = this.buildStandbyTaskMap(consumer, (Iterable<TaskId>)standbyTaskAssignments.get(consumer), activeTasksRemovedPendingRevokation, statefulTasks, partitionsForTask, clientMetadata.state);
            AssignmentInfo info = new AssignmentInfo(minUserMetadataVersion, minSupportedMetadataVersion, assignedActiveList, standbyTaskMap, partitionsByHostState, standbyPartitionsByHost, AssignorError.NONE.code());
            if (!activeTasksRemovedPendingRevokation.isEmpty()) {
                this.log.info("Requesting followup rebalance be scheduled immediately due to tasks changing ownership.");
                info.setNextRebalanceTime(0L);
                followupRebalanceRequiredForRevokedTasks = true;
                shouldEncodeProbingRebalance = false;
            } else if (shouldEncodeProbingRebalance) {
                long nextRebalanceTimeMs = this.time.milliseconds() + this.probingRebalanceIntervalMs();
                this.log.info("Requesting followup rebalance be scheduled for {} ms to probe for caught-up replica tasks.", (Object)nextRebalanceTimeMs);
                info.setNextRebalanceTime(nextRebalanceTimeMs);
                shouldEncodeProbingRebalance = false;
            }
            assignment.put(consumer, new ConsumerPartitionAssignor.Assignment(activePartitionsList, info.encode()));
        }
        return followupRebalanceRequiredForRevokedTasks;
    }

    private Set<TaskId> populateActiveTaskAndPartitionsLists(List<TopicPartition> activePartitionsList, List<TaskId> assignedActiveList, String consumer, ClientState clientState, List<TaskId> activeTasksForConsumer, Map<TaskId, Set<TopicPartition>> partitionsForTask, Set<TopicPartition> allOwnedPartitions) {
        ArrayList assignedPartitions = new ArrayList();
        TreeSet<TaskId> removedActiveTasks = new TreeSet<TaskId>();
        for (TaskId taskId : activeTasksForConsumer) {
            ArrayList<AssignedPartition> assignedPartitionsForTask = new ArrayList<AssignedPartition>();
            for (TopicPartition partition : partitionsForTask.get(taskId)) {
                boolean newPartitionForConsumer;
                String oldOwner = clientState.previousOwnerForPartition(partition);
                boolean bl = newPartitionForConsumer = oldOwner == null || !oldOwner.equals(consumer);
                if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) {
                    this.log.info("Removing task {} from {} active assignment until it is safely revoked in followup rebalance", (Object)taskId, (Object)consumer);
                    removedActiveTasks.add(taskId);
                    assignedPartitionsForTask.clear();
                    clientState.unassignActive(taskId);
                    break;
                }
                assignedPartitionsForTask.add(new AssignedPartition(taskId, partition));
            }
            assignedPartitions.addAll(assignedPartitionsForTask);
        }
        Collections.sort(assignedPartitions);
        for (AssignedPartition partition : assignedPartitions) {
            assignedActiveList.add(partition.taskId);
            activePartitionsList.add(partition.partition);
        }
        return removedActiveTasks;
    }

    private Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(String consumer, Iterable<TaskId> standbyTasks, Iterable<TaskId> revokedTasks, Set<TaskId> allStatefulTasks, Map<TaskId, Set<TopicPartition>> partitionsForTask, ClientState clientState) {
        HashMap<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<TaskId, Set<TopicPartition>>();
        for (TaskId task : standbyTasks) {
            standbyTaskMap.put(task, partitionsForTask.get(task));
        }
        for (TaskId task : revokedTasks) {
            if (!allStatefulTasks.contains(task)) continue;
            this.log.info("Adding removed stateful active task {} as a standby for {} before it is safely revoked in followup rebalance", (Object)task, (Object)consumer);
            standbyTaskMap.put(task, partitionsForTask.get(task));
            clientState.assignStandby(task);
        }
        return standbyTaskMap;
    }

    static Map<String, List<TaskId>> assignTasksToThreads(Collection<TaskId> statefulTasksToAssign, Collection<TaskId> statelessTasksToAssign, SortedSet<String> consumers, ClientState state) {
        Object task;
        HashMap<String, List<TaskId>> assignment = new HashMap<String, List<TaskId>>();
        for (String consumer2 : consumers) {
            assignment.put(consumer2, new ArrayList());
        }
        ArrayList<TaskId> unassignedStatelessTasks = new ArrayList<TaskId>(statelessTasksToAssign);
        Collections.sort(unassignedStatelessTasks);
        Iterator unassignedStatelessTasksIter = unassignedStatelessTasks.iterator();
        int minStatefulTasksPerThread = (int)Math.floor((double)statefulTasksToAssign.size() / (double)consumers.size());
        PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<TaskId>(statefulTasksToAssign);
        LinkedList<String> consumersToFill = new LinkedList<String>();
        TreeMap<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<TaskId, String>();
        if (!unassignedStatefulTasks.isEmpty()) {
            List threadAssignment;
            for (String string : consumers) {
                threadAssignment = (List)assignment.get(string);
                for (TaskId task2 : StreamsPartitionAssignor.getPreviousTasksByLag(state, string)) {
                    if (!unassignedStatefulTasks.contains(task2)) continue;
                    if (threadAssignment.size() < minStatefulTasksPerThread) {
                        threadAssignment.add(task2);
                        unassignedStatefulTasks.remove(task2);
                        continue;
                    }
                    unassignedTaskToPreviousOwner.put(task2, string);
                }
                if (threadAssignment.size() >= minStatefulTasksPerThread) continue;
                consumersToFill.offer(string);
            }
            while (!consumersToFill.isEmpty()) {
                task = unassignedStatefulTasks.poll();
                if (task != null) {
                    String string = (String)consumersToFill.poll();
                    threadAssignment = (List)assignment.get(string);
                    threadAssignment.add(task);
                    if (threadAssignment.size() >= minStatefulTasksPerThread) continue;
                    consumersToFill.offer(string);
                    continue;
                }
                throw new IllegalStateException("Ran out of unassigned stateful tasks but some members were not at capacity");
            }
            if (!unassignedStatefulTasks.isEmpty()) {
                String consumer4;
                consumersToFill.addAll(consumers);
                for (Map.Entry entry : unassignedTaskToPreviousOwner.entrySet()) {
                    TaskId task3 = (TaskId)entry.getKey();
                    String consumer5 = (String)entry.getValue();
                    if (!consumersToFill.contains(consumer5) || !unassignedStatefulTasks.contains(task3)) continue;
                    ((List)assignment.get(consumer5)).add(task3);
                    unassignedStatefulTasks.remove(task3);
                    consumersToFill.remove(consumer5);
                }
                for (TaskId taskId : unassignedStatefulTasks) {
                    String consumer6 = (String)consumersToFill.poll();
                    List threadAssignment2 = (List)assignment.get(consumer6);
                    threadAssignment2.add(taskId);
                }
                while (unassignedStatelessTasksIter.hasNext() && (consumer4 = (String)consumersToFill.poll()) != null) {
                    TaskId taskId = (TaskId)unassignedStatelessTasksIter.next();
                    unassignedStatelessTasksIter.remove();
                    ((List)assignment.get(consumer4)).add(taskId);
                }
            }
        }
        consumersToFill.addAll(consumers);
        while (unassignedStatelessTasksIter.hasNext()) {
            task = (TaskId)unassignedStatelessTasksIter.next();
            String string = (String)consumersToFill.poll();
            ((List)assignment.get(string)).add(task);
            consumersToFill.offer(string);
        }
        return assignment;
    }

    private static SortedSet<TaskId> getPreviousTasksByLag(ClientState state, String consumer) {
        TreeSet<TaskId> prevTasksByLag = new TreeSet<TaskId>(Comparator.comparingLong(state::lagFor).thenComparing(TaskId::compareTo));
        prevTasksByLag.addAll(state.previousTasksForConsumer(consumer));
        return prevTasksByLag;
    }

    private void validateMetadataVersions(int receivedAssignmentMetadataVersion, int latestCommonlySupportedVersion) {
        if (receivedAssignmentMetadataVersion > this.usedSubscriptionMetadataVersion) {
            this.log.error("Leader sent back an assignment with version {} which was greater than our used version {}", (Object)receivedAssignmentMetadataVersion, (Object)this.usedSubscriptionMetadataVersion);
            throw new TaskAssignmentException("Sent a version " + this.usedSubscriptionMetadataVersion + " subscription but got an assignment with higher version " + receivedAssignmentMetadataVersion + ".");
        }
        if (latestCommonlySupportedVersion > 8) {
            this.log.error("Leader sent back assignment with commonly supported version {} that is greater than our actual latest supported version {}", (Object)latestCommonlySupportedVersion, (Object)8);
            throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support");
        }
    }

    protected boolean maybeUpdateSubscriptionVersion(int receivedAssignmentMetadataVersion, int latestCommonlySupportedVersion) {
        if (receivedAssignmentMetadataVersion >= 3) {
            if (latestCommonlySupportedVersion > this.usedSubscriptionMetadataVersion) {
                this.log.info("Sent a version {} subscription and group's latest commonly supported version is {} (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to {} for next rebalance.", new Object[]{this.usedSubscriptionMetadataVersion, latestCommonlySupportedVersion, latestCommonlySupportedVersion});
                this.usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
                return true;
            }
            if (receivedAssignmentMetadataVersion < this.usedSubscriptionMetadataVersion) {
                this.log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.", new Object[]{this.usedSubscriptionMetadataVersion, receivedAssignmentMetadataVersion, latestCommonlySupportedVersion});
                this.usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
                return true;
            }
        } else {
            this.log.debug("Received an assignment version {} that is less than the earliest version that allows version probing {}. If this is not during a rolling upgrade from version 2.0 or below, this is an error.", (Object)receivedAssignmentMetadataVersion, (Object)3);
        }
        return false;
    }

    public void onAssignment(ConsumerPartitionAssignor.Assignment assignment, ConsumerGroupMetadata metadata) {
        long encodedNextScheduledRebalanceMs;
        Map<Object, Object> topicToPartitionInfo;
        Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;
        Map<HostInfo, Set<TopicPartition>> partitionsByHost;
        Map<TaskId, Set<TopicPartition>> activeTasks;
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>(assignment.partitions());
        partitions.sort(PARTITION_COMPARATOR);
        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        if (info.errCode() != AssignorError.NONE.code()) {
            this.assignmentErrorCode.set(info.errCode());
            return;
        }
        int receivedAssignmentMetadataVersion = info.version();
        int latestCommonlySupportedVersion = info.commonlySupportedVersion();
        this.validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion);
        switch (receivedAssignmentMetadataVersion) {
            case 1: {
                StreamsPartitionAssignor.validateActiveTaskEncoding(partitions, info, this.logPrefix);
                activeTasks = StreamsPartitionAssignor.getActiveTasks(partitions, info);
                partitionsByHost = Collections.emptyMap();
                standbyPartitionsByHost = Collections.emptyMap();
                topicToPartitionInfo = Collections.emptyMap();
                encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                break;
            }
            case 2: 
            case 3: 
            case 4: 
            case 5: {
                StreamsPartitionAssignor.validateActiveTaskEncoding(partitions, info, this.logPrefix);
                activeTasks = StreamsPartitionAssignor.getActiveTasks(partitions, info);
                partitionsByHost = info.partitionsByHost();
                standbyPartitionsByHost = Collections.emptyMap();
                topicToPartitionInfo = StreamsPartitionAssignor.getTopicPartitionInfo(partitionsByHost);
                encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                break;
            }
            case 6: {
                StreamsPartitionAssignor.validateActiveTaskEncoding(partitions, info, this.logPrefix);
                activeTasks = StreamsPartitionAssignor.getActiveTasks(partitions, info);
                partitionsByHost = info.partitionsByHost();
                standbyPartitionsByHost = info.standbyPartitionByHost();
                topicToPartitionInfo = StreamsPartitionAssignor.getTopicPartitionInfo(partitionsByHost);
                encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                break;
            }
            case 7: 
            case 8: {
                StreamsPartitionAssignor.validateActiveTaskEncoding(partitions, info, this.logPrefix);
                activeTasks = StreamsPartitionAssignor.getActiveTasks(partitions, info);
                partitionsByHost = info.partitionsByHost();
                standbyPartitionsByHost = info.standbyPartitionByHost();
                topicToPartitionInfo = StreamsPartitionAssignor.getTopicPartitionInfo(partitionsByHost);
                encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
                break;
            }
            default: {
                throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/");
            }
        }
        this.maybeScheduleFollowupRebalance(encodedNextScheduledRebalanceMs, receivedAssignmentMetadataVersion, latestCommonlySupportedVersion, partitionsByHost.keySet());
        Cluster fakeCluster = Cluster.empty().withPartitions(topicToPartitionInfo);
        this.streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fakeCluster);
        this.taskManager.handleAssignment(activeTasks, info.standbyTasks());
    }

    private void maybeScheduleFollowupRebalance(long encodedNextScheduledRebalanceMs, int receivedAssignmentMetadataVersion, int latestCommonlySupportedVersion, Set<HostInfo> groupHostInfo) {
        if (this.maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
            this.log.info("Requested to schedule immediate rebalance due to version probing.");
            this.nextScheduledRebalanceMs.set(0L);
        } else if (!this.verifyHostInfo(groupHostInfo)) {
            this.log.info("Requested to schedule immediate rebalance to update group with new host endpoint = {}.", (Object)this.userEndPoint);
            this.nextScheduledRebalanceMs.set(0L);
        } else if (encodedNextScheduledRebalanceMs == 0L) {
            this.log.info("Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner.");
            this.nextScheduledRebalanceMs.set(0L);
        } else if (encodedNextScheduledRebalanceMs < Long.MAX_VALUE) {
            this.log.info("Requested to schedule probing rebalance for {} ms.", (Object)encodedNextScheduledRebalanceMs);
            this.nextScheduledRebalanceMs.set(encodedNextScheduledRebalanceMs);
        } else {
            this.log.info("No followup rebalance was requested, resetting the rebalance schedule.");
            this.nextScheduledRebalanceMs.set(Long.MAX_VALUE);
        }
    }

    private boolean verifyHostInfo(Set<HostInfo> groupHostInfo) {
        if (this.userEndPoint != null && !groupHostInfo.isEmpty()) {
            HostInfo myHostInfo = HostInfo.buildFromEndpoint(this.userEndPoint);
            return groupHostInfo.contains(myHostInfo);
        }
        return true;
    }

    protected static Map<TaskId, Set<TopicPartition>> getActiveTasks(List<TopicPartition> partitions, AssignmentInfo info) {
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        for (int i = 0; i < partitions.size(); ++i) {
            TopicPartition partition = partitions.get(i);
            TaskId id = info.activeTasks().get(i);
            activeTasks.computeIfAbsent(id, k1 -> new HashSet()).add(partition);
        }
        return activeTasks;
    }

    static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(Map<HostInfo, Set<TopicPartition>> partitionsByHost) {
        HashMap<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<TopicPartition, PartitionInfo>();
        for (Set<TopicPartition> value : partitionsByHost.values()) {
            for (TopicPartition topicPartition : value) {
                topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
            }
        }
        return topicToPartitionInfo;
    }

    private static void validateActiveTaskEncoding(List<TopicPartition> partitions, AssignmentInfo info, String logPrefix) {
        if (partitions.size() != info.activeTasks().size()) {
            throw new TaskAssignmentException(String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d, assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks().size(), info.toString()));
        }
    }

    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<String, InternalTopicConfig> allRepartitionTopicsNumPartitions, Cluster metadata) {
        for (Set<String> copartitionGroup : copartitionGroups) {
            this.copartitionedTopicsEnforcer.enforce(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
        }
    }

    private int updateMinReceivedVersion(int usedVersion, int minReceivedMetadataVersion) {
        return Math.min(usedVersion, minReceivedMetadataVersion);
    }

    private int updateMinSupportedVersion(int supportedVersion, int minSupportedMetadataVersion) {
        if (supportedVersion < minSupportedMetadataVersion) {
            this.log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}", (Object)minSupportedMetadataVersion, (Object)supportedVersion);
            return supportedVersion;
        }
        this.log.debug("Current minimum supported version remains at {}, last seen supported version was {}", (Object)minSupportedMetadataVersion, (Object)supportedVersion);
        return minSupportedMetadataVersion;
    }

    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }

    ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol() {
        return this.rebalanceProtocol;
    }

    protected String userEndPoint() {
        return this.userEndPoint;
    }

    protected TaskManager taskManager() {
        return this.taskManager;
    }

    protected byte uniqueField() {
        return this.uniqueField;
    }

    protected void handleRebalanceStart(Set<String> topics) {
        this.taskManager.handleRebalanceStart(topics);
    }

    long acceptableRecoveryLag() {
        return this.assignmentConfigs.acceptableRecoveryLag;
    }

    int maxWarmupReplicas() {
        return this.assignmentConfigs.maxWarmupReplicas;
    }

    int numStandbyReplicas() {
        return this.assignmentConfigs.numStandbyReplicas;
    }

    long probingRebalanceIntervalMs() {
        return this.assignmentConfigs.probingRebalanceIntervalMs;
    }

    private static class ClientMetadata {
        private final HostInfo hostInfo;
        private final SortedSet<String> consumers;
        private final ClientState state;

        ClientMetadata(String endPoint) {
            this.hostInfo = HostInfo.buildFromEndpoint(endPoint);
            this.consumers = new TreeSet<String>();
            this.state = new ClientState();
        }

        void addConsumer(String consumerMemberId, List<TopicPartition> ownedPartitions) {
            this.consumers.add(consumerMemberId);
            this.state.incrementCapacity();
            this.state.addOwnedPartitions(ownedPartitions, consumerMemberId);
        }

        void addPreviousTasksAndOffsetSums(String consumerId, Map<TaskId, Long> taskOffsetSums) {
            this.state.addPreviousTasksAndOffsetSums(consumerId, taskOffsetSums);
        }

        public String toString() {
            return "ClientMetadata{hostInfo=" + this.hostInfo + ", consumers=" + this.consumers + ", state=" + this.state + '}';
        }
    }

    private static class AssignedPartition
    implements Comparable<AssignedPartition> {
        private final TaskId taskId;
        private final TopicPartition partition;

        AssignedPartition(TaskId taskId, TopicPartition partition) {
            this.taskId = taskId;
            this.partition = partition;
        }

        @Override
        public int compareTo(AssignedPartition that) {
            return PARTITION_COMPARATOR.compare(this.partition, that.partition);
        }

        public boolean equals(Object o) {
            if (!(o instanceof AssignedPartition)) {
                return false;
            }
            AssignedPartition other = (AssignedPartition)o;
            return this.compareTo(other) == 0;
        }

        public int hashCode() {
            return this.partition.hashCode();
        }
    }
}

