/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.DynamicWorkerConfigManager;
import org.apache.kafka.connect.runtime.distributed.ConnectAssignor;
import org.apache.kafka.connect.runtime.distributed.ConnectClusterMetrics;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

public class IncrementalCooperativeAssignor
implements ConnectAssignor {
    private final Logger log;
    private final Time time;
    private final int maxDelay;
    private final Supplier<Integer> maxTasksPerWorkerSupplier;
    private WorkerCoordinator.ConnectorsAndTasks previousAssignment;
    private final WorkerCoordinator.ConnectorsAndTasks previousRevocation;
    private final Map<ConnectorTaskId, String> tasksRevokedFromWorker;
    private Map<String, Collection<ConnectorTaskId>> nextTaskAssignments;
    private boolean revokedInPrevious;
    protected final Set<String> candidateWorkersForReassignment;
    protected long scheduledRebalance;
    protected int delay;
    protected int previousGenerationId;
    protected Set<String> previousMembers;
    private final ExponentialBackoff consecutiveRevokingRebalancesBackoff;
    protected boolean preemptiveScheduledRebalanceEndEnabled;
    private int numSuccessiveRevokingRebalances;
    int numLostWorkers;
    boolean preemptScheduledRebalanceDelay;
    boolean resourceAware;
    short loadUsageMaxDeviationFromAverage = (short)20;
    short loadUsageMaxDeviationFromAverageInBalancedCluster = (short)33;
    boolean isClusterWorkerLoadHigh;

    public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay, boolean preemptiveScheduledRebalanceEndEnabled, boolean resourceAware) {
        this(logContext, time, maxDelay, preemptiveScheduledRebalanceEndEnabled, () -> DynamicWorkerConfigManager.getInstance().getMaxTasksPerWorker(), resourceAware);
    }

    public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay, boolean preemptiveScheduledRebalanceEndEnabled, Supplier<Integer> maxTasksPerWorkerSupplier) {
        this.log = logContext.logger(IncrementalCooperativeAssignor.class);
        this.time = time;
        this.maxDelay = maxDelay;
        this.previousAssignment = WorkerCoordinator.ConnectorsAndTasks.EMPTY;
        this.previousRevocation = new WorkerCoordinator.ConnectorsAndTasks.Builder().build();
        this.tasksRevokedFromWorker = new HashMap<ConnectorTaskId, String>();
        this.scheduledRebalance = 0L;
        this.revokedInPrevious = false;
        this.candidateWorkersForReassignment = new LinkedHashSet<String>();
        this.delay = 0;
        this.previousGenerationId = -1;
        this.previousMembers = Collections.emptySet();
        this.numSuccessiveRevokingRebalances = 0;
        this.consecutiveRevokingRebalancesBackoff = new ExponentialBackoff(maxDelay == 0 ? 0L : 1L, 40, (long)maxDelay, 0.0);
        this.preemptScheduledRebalanceDelay = false;
        this.preemptiveScheduledRebalanceEndEnabled = preemptiveScheduledRebalanceEndEnabled;
        this.maxTasksPerWorkerSupplier = maxTasksPerWorkerSupplier;
        this.resourceAware = false;
        this.nextTaskAssignments = new HashMap<String, Collection<ConnectorTaskId>>();
        this.isClusterWorkerLoadHigh = false;
    }

    public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay, boolean preemptiveScheduledRebalanceEndEnabled, Supplier<Integer> maxTasksPerWorkerSupplier, boolean resourceAware) {
        this(logContext, time, maxDelay, preemptiveScheduledRebalanceEndEnabled, maxTasksPerWorkerSupplier);
        this.resourceAware = resourceAware;
    }

    @Override
    public Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata, WorkerCoordinator coordinator) {
        this.log.debug("Performing task assignment");
        HashMap<String, ExtendedWorkerState> memberConfigs = new HashMap<String, ExtendedWorkerState>();
        for (JoinGroupResponseData.JoinGroupResponseMember member : allMemberMetadata) {
            memberConfigs.put(member.memberId(), IncrementalCooperativeConnectProtocol.deserializeMetadata(ByteBuffer.wrap(member.metadata())));
        }
        this.log.debug("Member configs: {}", memberConfigs);
        long maxOffset = memberConfigs.values().stream().map(ConnectProtocol.WorkerState::offset).max(Long::compare).get();
        this.log.debug("Max config offset root: {}, local snapshot config offsets root: {}", (Object)maxOffset, (Object)coordinator.configSnapshot().offset());
        short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();
        Long leaderOffset = this.ensureLeaderConfig(maxOffset, coordinator);
        if (leaderOffset == null) {
            Map<String, ExtendedAssignment> assignments = this.fillAssignments(memberConfigs.keySet(), (short)1, leaderId, ((ExtendedWorkerState)memberConfigs.get(leaderId)).url(), maxOffset, ClusterAssignment.EMPTY, 0, protocolVersion);
            return this.serializeAssignments(assignments, protocolVersion);
        }
        return this.performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion);
    }

    private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
        if (coordinator.configSnapshot().offset() < maxOffset) {
            ClusterConfigState updatedSnapshot = coordinator.configFreshSnapshot();
            if (updatedSnapshot.offset() < maxOffset) {
                this.log.info("Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync.");
                return null;
            }
            coordinator.configSnapshot(updatedSnapshot);
            return updatedSnapshot.offset();
        }
        return maxOffset;
    }

    protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ExtendedWorkerState> memberConfigs, WorkerCoordinator coordinator, short protocolVersion) {
        this.log.debug("Performing task assignment during generation: {} with memberId: {}", (Object)coordinator.generationId(), (Object)coordinator.memberId());
        Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments = ConnectUtils.transformValues(memberConfigs, memberConfig -> new WorkerCoordinator.ConnectorsAndTasks.Builder().with(memberConfig.assignment().connectors(), memberConfig.assignment().tasks()).build());
        this.isClusterWorkerLoadHigh = coordinator.isAnyClusterWorkerLoadHigh();
        ClusterAssignment clusterAssignment = this.performTaskAssignment(coordinator.configSnapshot(), this.resourceAware ? coordinator.metricsSnapshot() : null, coordinator.lastCompletedGenerationId(), coordinator.generationId(), memberAssignments);
        coordinator.leaderState(new WorkerCoordinator.LeaderState(memberConfigs, clusterAssignment.allAssignedConnectors(), clusterAssignment.allAssignedTasks()));
        Map<String, ExtendedAssignment> assignments = this.fillAssignments(memberConfigs.keySet(), (short)0, leaderId, memberConfigs.get(leaderId).url(), maxOffset, clusterAssignment, this.delay, protocolVersion);
        this.log.debug("Actual assignments: {}", assignments);
        return this.serializeAssignments(assignments, protocolVersion);
    }

    ClusterAssignment performTaskAssignment(ClusterConfigState configSnapshot, ConnectClusterMetrics clusterMetrics, int lastCompletedGenerationId, int currentGenerationId, Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments) {
        this.log.debug("Previous assignments: {}", (Object)this.previousAssignment);
        if (this.previousGenerationId != lastCompletedGenerationId) {
            this.log.debug("Clearing the view of previous assignments due to generation mismatch between previous generation ID {} and last completed generation ID {}. This can happen if the leader fails to sync the assignment within a rebalancing round. The following view of previous assignments might be outdated and will be ignored by the leader in the current computation of new assignments. Possibly outdated previous assignments: {}", new Object[]{this.previousGenerationId, lastCompletedGenerationId, this.previousAssignment});
            this.previousAssignment = WorkerCoordinator.ConnectorsAndTasks.EMPTY;
        }
        if (this.resourceAware) {
            this.log.debug("connect cluster metrics: {}", (Object)clusterMetrics);
        }
        TreeSet<String> configuredConnectors = new TreeSet<String>(configSnapshot.connectors());
        Set<ConnectorTaskId> configuredTasks = ConnectUtils.combineCollections(configuredConnectors, configSnapshot::tasks, Collectors.toSet());
        WorkerCoordinator.ConnectorsAndTasks configured = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(configuredConnectors, configuredTasks).build();
        this.log.debug("Configured assignments: {}", (Object)configured);
        WorkerCoordinator.ConnectorsAndTasks activeAssignments = this.assignment(memberAssignments);
        this.log.debug("Active assignments: {}", (Object)activeAssignments);
        if (!this.previousRevocation.isEmpty()) {
            if (this.previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c)) || this.previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
                this.previousAssignment = activeAssignments;
            }
            this.previousRevocation.connectors().clear();
            this.previousRevocation.tasks().clear();
        }
        this.workersLostOrRecovered(memberAssignments.keySet());
        WorkerCoordinator.ConnectorsAndTasks deleted = IncrementalCooperativeAssignor.diff(this.previousAssignment, configured);
        this.log.debug("Deleted assignments: {}", (Object)deleted);
        WorkerCoordinator.ConnectorsAndTasks duplicated = this.duplicatedAssignments(memberAssignments);
        this.log.trace("Duplicated assignments: {}", (Object)duplicated);
        WorkerCoordinator.ConnectorsAndTasks lostAssignments = IncrementalCooperativeAssignor.diff(this.previousAssignment, activeAssignments, deleted);
        this.log.debug("Lost assignments: {}", (Object)lostAssignments);
        WorkerCoordinator.ConnectorsAndTasks created = IncrementalCooperativeAssignor.diff(configured, this.previousAssignment, activeAssignments);
        this.log.debug("Created: {}", (Object)created);
        List<WorkerCoordinator.WorkerLoad> currentWorkerAssignment = IncrementalCooperativeAssignor.workerAssignment(memberAssignments, deleted);
        HashMap<String, WorkerCoordinator.ConnectorsAndTasks.Builder> toRevoke = new HashMap<String, WorkerCoordinator.ConnectorsAndTasks.Builder>();
        Map<String, WorkerCoordinator.ConnectorsAndTasks> deletedToRevoke = IncrementalCooperativeAssignor.intersection(deleted, memberAssignments);
        this.log.debug("Deleted connectors and tasks to revoke from each worker: {}", deletedToRevoke);
        IncrementalCooperativeAssignor.addAll(toRevoke, deletedToRevoke);
        Map<String, WorkerCoordinator.ConnectorsAndTasks> duplicatedToRevoke = IncrementalCooperativeAssignor.intersection(duplicated, memberAssignments);
        this.log.debug("Duplicated connectors and tasks to revoke from each worker: {}", duplicatedToRevoke);
        IncrementalCooperativeAssignor.addAll(toRevoke, duplicatedToRevoke);
        List<WorkerCoordinator.WorkerLoad> nextWorkerAssignment = IncrementalCooperativeAssignor.workerLoads(memberAssignments);
        IncrementalCooperativeAssignor.removeAll(nextWorkerAssignment, deletedToRevoke);
        IncrementalCooperativeAssignor.removeAll(nextWorkerAssignment, duplicatedToRevoke);
        WorkerCoordinator.ConnectorsAndTasks.Builder lostAssignmentsToReassignBuilder = new WorkerCoordinator.ConnectorsAndTasks.Builder();
        this.handleLostAssignments(lostAssignments, lostAssignmentsToReassignBuilder, nextWorkerAssignment);
        WorkerCoordinator.ConnectorsAndTasks lostAssignmentsToReassign = lostAssignmentsToReassignBuilder.build();
        Integer maxTasksPerWorker = this.maxTasksPerWorkerSupplier.get();
        if (this.delay == 0) {
            if (this.resourceAware && !deleted.tasks().isEmpty()) {
                clusterMetrics.taskLoads().removeIf(taskLoad -> deleted.tasks().contains(taskLoad.taskId()));
                this.log.debug("Removed deleted tasks from the cluster metrics: {}", deleted.tasks());
            }
            Map<String, WorkerCoordinator.ConnectorsAndTasks> loadBalancingRevocations = this.performLoadBalancingRevocations(configured, nextWorkerAssignment, maxTasksPerWorker, clusterMetrics);
            if (this.revokedInPrevious && !loadBalancingRevocations.isEmpty()) {
                ++this.numSuccessiveRevokingRebalances;
                this.log.debug("Consecutive revoking rebalances observed. Computing delay and next scheduled rebalance.");
                this.delay = (int)this.consecutiveRevokingRebalancesBackoff.backoff((long)this.numSuccessiveRevokingRebalances);
                if (this.delay != 0) {
                    this.scheduledRebalance = this.time.milliseconds() + (long)this.delay;
                    this.log.debug("Skipping revocations in the current round with a delay of {}ms. Next scheduled rebalance:{}", (Object)this.delay, (Object)this.scheduledRebalance);
                } else {
                    this.log.debug("Revoking assignments immediately since scheduled.rebalance.max.delay.ms is set to 0");
                    IncrementalCooperativeAssignor.addAll(toRevoke, loadBalancingRevocations);
                    IncrementalCooperativeAssignor.removeAll(nextWorkerAssignment, loadBalancingRevocations);
                }
            } else if (!loadBalancingRevocations.isEmpty()) {
                this.log.debug("Performing allocation-balancing revocation immediately as no revocations took place during the previous rebalance");
                IncrementalCooperativeAssignor.addAll(toRevoke, loadBalancingRevocations);
                IncrementalCooperativeAssignor.removeAll(nextWorkerAssignment, loadBalancingRevocations);
                this.revokedInPrevious = true;
            } else if (this.revokedInPrevious) {
                this.log.debug("Previous round had revocations but this round didn't. Probably, the cluster has reached a balanced load. Resetting the exponential backoff clock");
                this.revokedInPrevious = false;
                this.numSuccessiveRevokingRebalances = 0;
            } else {
                this.log.debug("No revocations in previous and current round.");
            }
        } else {
            this.log.debug("Delayed rebalance is active. Delaying {}ms before revoking connectors and tasks: {}", (Object)this.delay, toRevoke);
            this.revokedInPrevious = false;
        }
        WorkerCoordinator.ConnectorsAndTasks toAssign = new WorkerCoordinator.ConnectorsAndTasks.Builder().addAll(created).addAll(lostAssignmentsToReassign).build();
        this.assignConnectors(nextWorkerAssignment, toAssign.connectors());
        if (this.resourceAware) {
            this.assignTasks(nextWorkerAssignment, toAssign.tasks(), maxTasksPerWorker, clusterMetrics);
            this.tasksRevokedFromWorker.clear();
        } else {
            this.assignTasks(nextWorkerAssignment, toAssign.tasks(), maxTasksPerWorker);
        }
        Map nextConnectorAssignments = nextWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, WorkerCoordinator.WorkerLoad::connectors));
        this.nextTaskAssignments = nextWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, WorkerCoordinator.WorkerLoad::tasks));
        Map currentConnectorAssignments = currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, WorkerCoordinator.WorkerLoad::connectors));
        Map currentTaskAssignments = currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, WorkerCoordinator.WorkerLoad::tasks));
        Map<String, Collection<String>> incrementalConnectorAssignments = IncrementalCooperativeAssignor.diff(nextConnectorAssignments, currentConnectorAssignments);
        Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments = IncrementalCooperativeAssignor.diff(this.nextTaskAssignments, currentTaskAssignments);
        Map<String, WorkerCoordinator.ConnectorsAndTasks> revoked = IncrementalCooperativeAssignor.buildAll(toRevoke);
        this.previousAssignment = this.computePreviousAssignment(revoked, nextConnectorAssignments, this.nextTaskAssignments, lostAssignments);
        this.previousGenerationId = currentGenerationId;
        this.previousMembers = memberAssignments.keySet();
        this.log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
        this.log.debug("Incremental task assignments: {}", incrementalTaskAssignments);
        this.log.debug("Previous revocation: {}", (Object)this.previousRevocation);
        this.log.debug("Next task assignments: {}", this.nextTaskAssignments);
        Map<String, Collection<String>> revokedConnectors = ConnectUtils.transformValues(revoked, WorkerCoordinator.ConnectorsAndTasks::connectors);
        Map<String, Collection<ConnectorTaskId>> revokedTasks = ConnectUtils.transformValues(revoked, WorkerCoordinator.ConnectorsAndTasks::tasks);
        return new ClusterAssignment(incrementalConnectorAssignments, incrementalTaskAssignments, revokedConnectors, revokedTasks, IncrementalCooperativeAssignor.diff(nextConnectorAssignments, revokedConnectors), IncrementalCooperativeAssignor.diff(this.nextTaskAssignments, revokedTasks));
    }

    protected void workersLostOrRecovered(Set<String> members) {
        if (!this.preemptiveScheduledRebalanceEndEnabled) {
            this.log.trace("Pre-emptive Scheduled Rebalance Ending Not Enabled. Returning");
            return;
        }
        if (!this.previousMembers.isEmpty()) {
            if (this.previousMembers.size() > members.size()) {
                this.numLostWorkers += this.previousMembers.size() - members.size();
            } else if (this.previousMembers.size() < members.size() && this.numLostWorkers > 0) {
                this.numLostWorkers -= members.size() - this.previousMembers.size();
                this.numLostWorkers = Math.max(this.numLostWorkers, 0);
                if (this.numLostWorkers == 0) {
                    this.log.info("All the lost workers have joined back. Ending any active scheduled rebalance delays preemptively.");
                    this.preemptScheduledRebalanceDelay = true;
                }
            }
        }
    }

    public boolean preemptScheduledRebalanceDelay() {
        if (this.numLostWorkers == 0 || this.preemptScheduledRebalanceDelay) {
            return false;
        }
        this.preemptScheduledRebalanceDelay = true;
        return true;
    }

    private WorkerCoordinator.ConnectorsAndTasks computePreviousAssignment(Map<String, WorkerCoordinator.ConnectorsAndTasks> toRevoke, Map<String, Collection<String>> connectorAssignments, Map<String, Collection<ConnectorTaskId>> taskAssignments, WorkerCoordinator.ConnectorsAndTasks lostAssignments) {
        WorkerCoordinator.ConnectorsAndTasks previousAssignment = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(ConnectUtils.combineCollections(connectorAssignments.values()), ConnectUtils.combineCollections(taskAssignments.values())).build();
        for (WorkerCoordinator.ConnectorsAndTasks revoked : toRevoke.values()) {
            previousAssignment.connectors().removeAll(revoked.connectors());
            previousAssignment.tasks().removeAll(revoked.tasks());
            this.previousRevocation.connectors().addAll(revoked.connectors());
            this.previousRevocation.tasks().addAll(revoked.tasks());
        }
        previousAssignment.connectors().addAll(lostAssignments.connectors());
        previousAssignment.tasks().addAll(lostAssignments.tasks());
        for (String revokedFrom : toRevoke.keySet()) {
            for (ConnectorTaskId taskId : toRevoke.get(revokedFrom).tasks()) {
                this.tasksRevokedFromWorker.put(taskId, revokedFrom);
            }
        }
        return previousAssignment;
    }

    public WorkerCoordinator.ConnectorsAndTasks getPreviousRevocation() {
        return this.previousRevocation;
    }

    public Map<String, Collection<ConnectorTaskId>> getNextAssignment() {
        return this.nextTaskAssignments;
    }

    private WorkerCoordinator.ConnectorsAndTasks duplicatedAssignments(Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments) {
        Map connectorInstanceCounts = ConnectUtils.combineCollections(memberAssignments.values(), WorkerCoordinator.ConnectorsAndTasks::connectors, Collectors.groupingBy(Function.identity(), Collectors.counting()));
        Set<String> duplicatedConnectors = connectorInstanceCounts.entrySet().stream().filter(entry -> (Long)entry.getValue() > 1L).map(Map.Entry::getKey).collect(Collectors.toSet());
        Map taskInstanceCounts = ConnectUtils.combineCollections(memberAssignments.values(), WorkerCoordinator.ConnectorsAndTasks::tasks, Collectors.groupingBy(Function.identity(), Collectors.counting()));
        Set<ConnectorTaskId> duplicatedTasks = taskInstanceCounts.entrySet().stream().filter(entry -> (Long)entry.getValue() > 1L).map(Map.Entry::getKey).collect(Collectors.toSet());
        return new WorkerCoordinator.ConnectorsAndTasks.Builder().with(duplicatedConnectors, duplicatedTasks).build();
    }

    protected void handleLostAssignments(WorkerCoordinator.ConnectorsAndTasks lostAssignments, WorkerCoordinator.ConnectorsAndTasks.Builder lostAssignmentsToReassign, List<WorkerCoordinator.WorkerLoad> completeWorkerAssignment) {
        if (lostAssignments.isEmpty() && !this.revokedInPrevious) {
            this.resetDelay();
            this.preemptScheduledRebalanceDelay = false;
            return;
        }
        long now = this.time.milliseconds();
        this.log.debug("Found the following connectors and tasks missing from previous assignments: " + String.valueOf(lostAssignments));
        Set activeMembers = completeWorkerAssignment.stream().map(WorkerCoordinator.WorkerLoad::worker).collect(Collectors.toSet());
        this.log.debug("activeMembers::{}", activeMembers);
        if (this.scheduledRebalance <= 0L && activeMembers.containsAll(this.previousMembers)) {
            this.log.debug("No worker seems to have departed the group during the rebalance. The missing assignments that the leader is detecting are probably due to some workers failing to receive the new assignments in the previous rebalance. Will reassign missing tasks as new tasks");
            lostAssignmentsToReassign.addAll(lostAssignments);
            return;
        }
        if (this.maxDelay == 0) {
            this.log.debug("Scheduled rebalance delays are disabled ({} = 0); reassigning all lost connectors and tasks immediately", (Object)"scheduled.rebalance.max.delay.ms");
            lostAssignmentsToReassign.addAll(lostAssignments);
            return;
        }
        if (this.scheduledRebalance > 0L && now >= this.scheduledRebalance || this.preemptScheduledRebalanceDelay) {
            if (this.preemptScheduledRebalanceDelay) {
                this.log.debug("All departed workers have joined back before scheduled rebalance delay expired or a preemptive end to the scheduled rebalance was requested. Reassigning lost tasks");
                this.candidateWorkersForReassignment.addAll(this.candidateWorkersForReassignment(completeWorkerAssignment));
            } else {
                this.log.debug("Delayed rebalance expired. Reassigning lost tasks");
            }
            List<Object> candidateWorkerLoad = Collections.emptyList();
            if (!this.candidateWorkersForReassignment.isEmpty()) {
                candidateWorkerLoad = this.pickCandidateWorkerForReassignment(completeWorkerAssignment);
            }
            if (!candidateWorkerLoad.isEmpty()) {
                WorkerCoordinator.WorkerLoad worker;
                this.log.debug("Assigning lost tasks to {} candidate workers: {}", (Object)candidateWorkerLoad.size(), (Object)candidateWorkerLoad.stream().map(WorkerCoordinator.WorkerLoad::worker).collect(Collectors.joining(",")));
                Iterator<Object> candidateWorkerIterator = candidateWorkerLoad.iterator();
                for (String connector : lostAssignments.connectors()) {
                    if (!candidateWorkerIterator.hasNext()) {
                        candidateWorkerIterator = candidateWorkerLoad.iterator();
                    }
                    worker = (WorkerCoordinator.WorkerLoad)candidateWorkerIterator.next();
                    this.log.debug("Assigning connector id {} to member {}", (Object)connector, (Object)worker.worker());
                    worker.assign(connector);
                }
                candidateWorkerIterator = candidateWorkerLoad.iterator();
                for (ConnectorTaskId task : lostAssignments.tasks()) {
                    if (!candidateWorkerIterator.hasNext()) {
                        candidateWorkerIterator = candidateWorkerLoad.iterator();
                    }
                    worker = (WorkerCoordinator.WorkerLoad)candidateWorkerIterator.next();
                    this.log.debug("Assigning task id {} to member {}", (Object)task, (Object)worker.worker());
                    worker.assign(task);
                }
            } else {
                this.log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
                lostAssignmentsToReassign.addAll(lostAssignments);
            }
            this.resetDelay();
            this.revokedInPrevious = false;
            this.preemptScheduledRebalanceDelay = false;
            this.numLostWorkers = 0;
        } else {
            this.candidateWorkersForReassignment.addAll(this.candidateWorkersForReassignment(completeWorkerAssignment));
            if (now < this.scheduledRebalance) {
                this.delay = this.calculateDelay(now);
                this.log.debug("Delayed rebalance in progress. Task reassignment is postponed. New computed rebalance delay: {}", (Object)this.delay);
            } else {
                this.delay = this.maxDelay;
                this.log.debug("Resetting rebalance delay to the max: {}. scheduledRebalance: {} now: {} diff scheduledRebalance - now: {}", new Object[]{this.delay, this.scheduledRebalance, now, this.scheduledRebalance - now});
            }
            this.scheduledRebalance = now + (long)this.delay;
        }
    }

    private void resetDelay() {
        this.candidateWorkersForReassignment.clear();
        this.scheduledRebalance = 0L;
        if (this.delay != 0) {
            this.log.debug("Resetting delay from previous value: {} to 0", (Object)this.delay);
        }
        this.delay = 0;
    }

    private Set<String> candidateWorkersForReassignment(List<WorkerCoordinator.WorkerLoad> completeWorkerAssignment) {
        return completeWorkerAssignment.stream().filter(WorkerCoordinator.WorkerLoad::isEmpty).map(WorkerCoordinator.WorkerLoad::worker).collect(Collectors.toSet());
    }

    private List<WorkerCoordinator.WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerCoordinator.WorkerLoad> completeWorkerAssignment) {
        Map activeWorkers = completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerCoordinator.WorkerLoad::worker, Function.identity()));
        return this.candidateWorkersForReassignment.stream().map(activeWorkers::get).filter(Objects::nonNull).collect(Collectors.toList());
    }

    private Map<String, ExtendedAssignment> fillAssignments(Collection<String> members, short error, String leaderId, String leaderUrl, long maxOffset, ClusterAssignment clusterAssignment, int delay, short protocolVersion) {
        HashMap<String, ExtendedAssignment> groupAssignment = new HashMap<String, ExtendedAssignment>();
        for (String member : members) {
            Collection<String> connectorsToStart = clusterAssignment.newlyAssignedConnectors(member);
            Collection<ConnectorTaskId> tasksToStart = clusterAssignment.newlyAssignedTasks(member);
            Collection<String> connectorsToStop = clusterAssignment.newlyRevokedConnectors(member);
            Collection<ConnectorTaskId> tasksToStop = clusterAssignment.newlyRevokedTasks(member);
            ExtendedAssignment assignment = new ExtendedAssignment(protocolVersion, error, leaderId, leaderUrl, maxOffset, connectorsToStart, tasksToStart, connectorsToStop, tasksToStop, delay);
            this.log.debug("Filling assignment: {} -> {}", (Object)member, (Object)assignment);
            groupAssignment.put(member, assignment);
        }
        this.log.debug("Finished assignment");
        return groupAssignment;
    }

    protected Map<String, ByteBuffer> serializeAssignments(Map<String, ExtendedAssignment> assignments, short protocolVersion) {
        boolean sessioned = protocolVersion >= 2;
        return assignments.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> IncrementalCooperativeConnectProtocol.serializeAssignment((ExtendedAssignment)e.getValue(), sessioned)));
    }

    private static WorkerCoordinator.ConnectorsAndTasks diff(WorkerCoordinator.ConnectorsAndTasks base, WorkerCoordinator.ConnectorsAndTasks ... toSubtract) {
        TreeSet<String> connectors = new TreeSet<String>(base.connectors());
        TreeSet<ConnectorTaskId> tasks = new TreeSet<ConnectorTaskId>(base.tasks());
        for (WorkerCoordinator.ConnectorsAndTasks sub : toSubtract) {
            connectors.removeAll(sub.connectors());
            tasks.removeAll(sub.tasks());
        }
        return new WorkerCoordinator.ConnectorsAndTasks.Builder().with(connectors, tasks).build();
    }

    private static <T> Map<String, Collection<T>> diff(Map<String, Collection<T>> base, Map<String, Collection<T>> toSubtract) {
        HashMap<String, Collection<T>> incremental = new HashMap<String, Collection<T>>();
        for (Map.Entry<String, Collection<T>> entry : base.entrySet()) {
            ArrayList<T> values = new ArrayList<T>(entry.getValue());
            values.removeAll(toSubtract.getOrDefault(entry.getKey(), Collections.emptySet()));
            incremental.put(entry.getKey(), values);
        }
        return incremental;
    }

    private WorkerCoordinator.ConnectorsAndTasks assignment(Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments) {
        this.log.debug("Received assignments: {}", memberAssignments);
        return new WorkerCoordinator.ConnectorsAndTasks.Builder().with(ConnectUtils.combineCollections(memberAssignments.values(), WorkerCoordinator.ConnectorsAndTasks::connectors), ConnectUtils.combineCollections(memberAssignments.values(), WorkerCoordinator.ConnectorsAndTasks::tasks)).build();
    }

    private Map<String, Set<ConnectorTaskId>> resourceAwareLoadBalancingRevocations(Collection<WorkerCoordinator.WorkerLoad> workers, ConnectClusterMetrics clusterMetrics) {
        Map<String, Set<ConnectorTaskId>> taskRevocations;
        if (clusterMetrics == null || clusterMetrics.workerLoads().isEmpty() || clusterMetrics.taskLoads().isEmpty()) {
            this.log.debug("No cluster metrics found. Defaulting to num tasks based load balancing revocations");
            return null;
        }
        this.log.debug("clusterMetrics::{}", (Object)clusterMetrics);
        double clusterTaskLoadAverage = clusterMetrics.taskLoads().stream().mapToDouble(ConnectClusterMetrics.TaskLoad::taskLoad).sum() / (double)workers.size();
        if (clusterTaskLoadAverage == 0.0) {
            this.log.debug("Task load average is either 0 or no metrics are available. Defaulting to num tasks based load balancing revocations");
            return null;
        }
        Map<String, Double> workerTaskLoads = this.workerTaskLoads(workers, clusterMetrics);
        Map<ConnectorTaskId, Double> taskLoads = clusterMetrics.taskLoadsToMap();
        HashMap workerTasksWithLoad = new HashMap();
        for (WorkerCoordinator.WorkerLoad workerLoad : workers) {
            workerTasksWithLoad.put(workerLoad.worker(), workerLoad.tasks().stream().map(task -> new ConnectClusterMetrics.TaskLoad((ConnectorTaskId)task, taskLoads.getOrDefault(task, 0.0))).collect(Collectors.toList()));
        }
        this.log.debug("clusterTaskLoadAverage::{}", (Object)clusterTaskLoadAverage);
        this.log.debug("workerTaskLoads::{}", workerTaskLoads);
        this.log.debug("taskLoads::{}", taskLoads);
        Short loadDeviation = this.isClusterWorkerLoadHigh ? Short.valueOf(this.loadUsageMaxDeviationFromAverage) : Short.valueOf(this.loadUsageMaxDeviationFromAverageInBalancedCluster);
        Map<String, Double> overloadedWorkers = workerTaskLoads.entrySet().stream().filter(e -> workerTasksWithLoad.containsKey(e.getKey()) && ((List)workerTasksWithLoad.get(e.getKey())).size() > 1).filter(e -> (Double)e.getValue() > clusterTaskLoadAverage && ((Double)e.getValue() - clusterTaskLoadAverage) * 100.0 / clusterTaskLoadAverage > (double)loadDeviation.shortValue()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        ArrayList emptyWorkers = workerTaskLoads.entrySet().stream().filter(e -> (Double)e.getValue() == 0.0).map(Map.Entry::getKey).collect(Collectors.toCollection(ArrayList::new));
        if (!emptyWorkers.isEmpty() && overloadedWorkers.isEmpty()) {
            int numEmptyWorkers = emptyWorkers.size();
            overloadedWorkers = workerTaskLoads.entrySet().stream().filter(e -> workerTasksWithLoad.containsKey(e.getKey()) && ((List)workerTasksWithLoad.get(e.getKey())).size() > 1).sorted((e1, e2) -> Double.compare((Double)e2.getValue(), (Double)e1.getValue())).limit(numEmptyWorkers).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            this.log.debug("Found workers with no task load. Revoking tasks from most loaded worker: {}", overloadedWorkers);
        }
        if (overloadedWorkers.isEmpty()) {
            this.log.debug("Found no workers running more than 1 task and exceeding task load threshold. No revocations needed");
            taskRevocations = Collections.emptyMap();
        } else if (overloadedWorkers.size() == workers.size()) {
            this.log.debug("All the workers are overloaded. Not performing any revocations ");
            taskRevocations = Collections.emptyMap();
        } else {
            LinkedList toRevokeFrom = overloadedWorkers.entrySet().stream().sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())).map(Map.Entry::getKey).collect(Collectors.toCollection(LinkedList::new));
            HashMap<String, Set<ConnectorTaskId>> revocations = new HashMap<String, Set<ConnectorTaskId>>();
            int emptyWorkerCount = emptyWorkers.size();
            for (String worker : toRevokeFrom) {
                LinkedList tasksRunningOnWorker = ((List)workerTasksWithLoad.get(worker)).stream().sorted(Comparator.comparing(ConnectClusterMetrics.TaskLoad::taskLoad, Comparator.reverseOrder())).collect(Collectors.toCollection(LinkedList::new));
                int numHotTasksOnWorker = this.numHotTasks(worker, (List)workerTasksWithLoad.get(worker));
                Double avgTaskLoad = this.averageWorkerTaskLoad((List)workerTasksWithLoad.get(worker));
                double workerTaskLoad = workerTaskLoads.getOrDefault(worker, 0.0);
                this.log.debug("Worker::{} task load before revocation::{}, num of tasks:: {}", new Object[]{worker, workerTaskLoad, tasksRunningOnWorker.size()});
                if (numHotTasksOnWorker == 1) {
                    this.log.debug("Not revoking task {} from worker {} as it is the only hot task.", tasksRunningOnWorker.peek(), (Object)worker);
                    tasksRunningOnWorker.poll();
                }
                HashSet<ConnectorTaskId> tasksToRevoke = new HashSet<ConnectorTaskId>();
                while ((numHotTasksOnWorker == 1 && tasksRunningOnWorker.size() >= 1 || tasksRunningOnWorker.size() > 1) && workerTaskLoad > clusterTaskLoadAverage) {
                    ConnectClusterMetrics.TaskLoad taskLoad = (ConnectClusterMetrics.TaskLoad)tasksRunningOnWorker.poll();
                    if (emptyWorkerCount == 0 && (taskLoad.taskLoad() == 0.0 || taskLoad.taskLoad() < 0.02 * avgTaskLoad)) break;
                    if (emptyWorkerCount > 0) {
                        --emptyWorkerCount;
                    }
                    workerTaskLoad -= taskLoad.taskLoad();
                    tasksToRevoke.add(taskLoad.taskId());
                }
                if (!tasksToRevoke.isEmpty()) {
                    revocations.put(worker, tasksToRevoke);
                }
                this.log.debug("Worker::{} task load after revocation::{}, num of tasks::{}", new Object[]{worker, workerTaskLoad, tasksRunningOnWorker.size()});
            }
            this.log.debug("Worker revocations::{}", revocations);
            taskRevocations = revocations;
        }
        return taskRevocations;
    }

    private double averageWorkerTaskLoad(List<ConnectClusterMetrics.TaskLoad> workerTaskLoad) {
        return workerTaskLoad.stream().mapToDouble(ConnectClusterMetrics.TaskLoad::taskLoad).sum() / (double)workerTaskLoad.size();
    }

    private int numHotTasks(String workerId, List<ConnectClusterMetrics.TaskLoad> workerTaskLoad) {
        if (workerTaskLoad.isEmpty()) {
            return 0;
        }
        Double avgWorkerTaskLoad = this.averageWorkerTaskLoad(workerTaskLoad);
        this.log.debug("Average task load on worker::{} is ::{}", (Object)workerId, (Object)avgWorkerTaskLoad);
        int numHotTasks = (int)workerTaskLoad.stream().map(ConnectClusterMetrics.TaskLoad::taskLoad).filter(load -> load > 0.8 * avgWorkerTaskLoad).count();
        this.log.debug("Num hot tasks on worker::{} is::{}", (Object)workerId, (Object)numHotTasks);
        return numHotTasks;
    }

    private Map<String, WorkerCoordinator.ConnectorsAndTasks> performLoadBalancingRevocations(WorkerCoordinator.ConnectorsAndTasks configured, Collection<WorkerCoordinator.WorkerLoad> workers, Integer maxTasksPerWorker, ConnectClusterMetrics clusterMetrics) {
        if (this.log.isTraceEnabled()) {
            workers.forEach(wl -> this.log.trace("Per worker current load size; worker: {} connectors: {} tasks: {}", new Object[]{wl.worker(), wl.connectorsSize(), wl.tasksSize()}));
        }
        if (workers.stream().allMatch(WorkerCoordinator.WorkerLoad::isEmpty)) {
            this.log.trace("No load-balancing revocations required; all workers are either new or will have all currently-assigned connectors and tasks revoked during this round");
            return Collections.emptyMap();
        }
        if (configured.isEmpty()) {
            this.log.trace("No load-balancing revocations required; no connectors are currently configured on this cluster");
            return Collections.emptyMap();
        }
        HashMap result = new HashMap();
        AssignmentParams connectorAssignmentParams = this.buildConnectorAssignmentParams(workers, configured.connectors());
        Map connectorRevocations = this.loadBalancingRevocations(connectorAssignmentParams, workers, WorkerCoordinator.WorkerLoad::connectors);
        Map<String, Set<ConnectorTaskId>> taskRevocations = null;
        if (this.resourceAware) {
            taskRevocations = this.resourceAwareLoadBalancingRevocations(workers, clusterMetrics);
        }
        if (taskRevocations == null) {
            AssignmentParams taskAssignmentParams = this.buildTaskAssignmentParams(workers, configured.tasks(), maxTasksPerWorker);
            taskRevocations = this.loadBalancingRevocations(taskAssignmentParams, workers, WorkerCoordinator.WorkerLoad::tasks);
        }
        connectorRevocations.forEach((worker, revoked) -> result.computeIfAbsent(worker, w -> new WorkerCoordinator.ConnectorsAndTasks.Builder()).addConnectors((Collection<String>)revoked));
        taskRevocations.forEach((worker, revoked) -> result.computeIfAbsent(worker, w -> new WorkerCoordinator.ConnectorsAndTasks.Builder()).addTasks((Collection<ConnectorTaskId>)revoked));
        return IncrementalCooperativeAssignor.buildAll(result);
    }

    private AssignmentParams buildConnectorAssignmentParams(Collection<WorkerCoordinator.WorkerLoad> workers, Collection<String> connectors) {
        int totalToAllocate = connectors.size();
        int totalWorkers = workers.size();
        int minAllocatedPerWorker = totalToAllocate / totalWorkers;
        int workersToAllocateExtra = totalToAllocate % totalWorkers;
        return new AssignmentParams("connector", totalWorkers, minAllocatedPerWorker, workersToAllocateExtra);
    }

    private AssignmentParams buildTaskAssignmentParams(Collection<WorkerCoordinator.WorkerLoad> workers, Collection<ConnectorTaskId> tasks, Integer maxTasksPerWorkerLimit) {
        int maxPossibleTasksInAWorker;
        int totalToAllocate = tasks.size();
        int totalWorkers = workers.size();
        int minAllocatedPerWorker = totalToAllocate / totalWorkers;
        int workersToAllocateExtra = totalToAllocate % totalWorkers;
        int n = maxPossibleTasksInAWorker = workersToAllocateExtra == 0 ? minAllocatedPerWorker : minAllocatedPerWorker + 1;
        if (maxTasksPerWorkerLimit < maxPossibleTasksInAWorker) {
            this.log.debug("The maximum tasks per worker limit is set to {} which is less than the calculated max possible tasks in a worker {}", (Object)maxTasksPerWorkerLimit, (Object)maxPossibleTasksInAWorker);
            minAllocatedPerWorker = maxTasksPerWorkerLimit;
            workersToAllocateExtra = 0;
        }
        return new AssignmentParams("task", totalWorkers, minAllocatedPerWorker, workersToAllocateExtra);
    }

    private <E> Map<String, Set<E>> loadBalancingRevocations(AssignmentParams params, Collection<WorkerCoordinator.WorkerLoad> workers, Function<WorkerCoordinator.WorkerLoad, Collection<E>> workerAllocation) {
        int totalWorkers = workers.size();
        int minAllocatedPerWorker = params.minAllocatedPerWorker;
        int workersToAllocateExtra = params.workersToAllocateExtra;
        Function<WorkerCoordinator.WorkerLoad, Integer> workerAllocationSize = workerAllocation.andThen(Collection::size);
        long workersAllocatedMinimum = workers.stream().map(workerAllocationSize).filter(n -> n == minAllocatedPerWorker).count();
        long workersAllocatedSingleExtra = workers.stream().map(workerAllocationSize).filter(n -> n == minAllocatedPerWorker + 1).count();
        if (workersAllocatedSingleExtra == (long)workersToAllocateExtra && workersAllocatedMinimum + workersAllocatedSingleExtra == (long)totalWorkers) {
            this.log.trace("No load-balancing {} revocations required; the current allocations, when combined with any newly-created {}s, should be balanced", (Object)params.allocatedResourceName, (Object)params.allocatedResourceName);
            return Collections.emptyMap();
        }
        HashMap<String, Set<E>> result = new HashMap<String, Set<E>>();
        int allocatedExtras = 0;
        block0: for (WorkerCoordinator.WorkerLoad worker : workers) {
            int maxAllocationForWorker;
            int currentAllocationSizeForWorker = workerAllocationSize.apply(worker);
            if (currentAllocationSizeForWorker <= minAllocatedPerWorker) continue;
            if (allocatedExtras < workersToAllocateExtra) {
                ++allocatedExtras;
                if (currentAllocationSizeForWorker == minAllocatedPerWorker + 1) continue;
                maxAllocationForWorker = minAllocatedPerWorker + 1;
            } else {
                maxAllocationForWorker = minAllocatedPerWorker;
            }
            LinkedHashSet<E> revokedFromWorker = new LinkedHashSet<E>();
            result.put(worker.worker(), revokedFromWorker);
            Iterator<E> currentWorkerAllocation = workerAllocation.apply(worker).iterator();
            int numRevoked = 0;
            while (currentAllocationSizeForWorker - numRevoked > maxAllocationForWorker) {
                if (!currentWorkerAllocation.hasNext()) {
                    this.log.warn("Unexpectedly ran out of {}s to revoke from worker {} while performing load-balancing revocations; worker appears to still be allocated {} instances, which is more than the intended allocation of {}", new Object[]{params.allocatedResourceName, worker.worker(), workerAllocationSize.apply(worker), maxAllocationForWorker});
                    continue block0;
                }
                E revocation = currentWorkerAllocation.next();
                revokedFromWorker.add(revocation);
                ++numRevoked;
            }
        }
        return result;
    }

    private int calculateDelay(long now) {
        long diff = this.scheduledRebalance - now;
        return diff > 0L ? (int)Math.min(diff, (long)this.maxDelay) : 0;
    }

    protected void assignConnectors(List<WorkerCoordinator.WorkerLoad> workerAssignment, Collection<String> connectors) {
        workerAssignment.sort(WorkerCoordinator.WorkerLoad.connectorComparator());
        WorkerCoordinator.WorkerLoad first = workerAssignment.get(0);
        Iterator<String> load = connectors.iterator();
        block0: while (load.hasNext()) {
            int firstLoad = first.connectorsSize();
            int upTo = IntStream.range(0, workerAssignment.size()).filter(i -> ((WorkerCoordinator.WorkerLoad)workerAssignment.get(i)).connectorsSize() > firstLoad).findFirst().orElse(workerAssignment.size());
            for (WorkerCoordinator.WorkerLoad worker : workerAssignment.subList(0, upTo)) {
                String connector = load.next();
                this.log.debug("Assigning connector {} to {}", (Object)connector, (Object)worker.worker());
                worker.assign(connector);
                if (load.hasNext()) continue;
                continue block0;
            }
        }
    }

    protected void assignTasks(List<WorkerCoordinator.WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks, Integer maxTasksPerWorker, ConnectClusterMetrics clusterMetrics) {
        if (clusterMetrics == null || clusterMetrics.workerLoads().isEmpty() || clusterMetrics.taskLoads().isEmpty()) {
            this.log.debug("No cluster metrics found. Defaulting to num tasks based task assignment");
            this.assignTasks(workerAssignment, tasks, maxTasksPerWorker);
            return;
        }
        Map<String, Double> workerLoads = clusterMetrics.workerLoadsToMap();
        Map<String, Double> workerTaskLoads = this.workerTaskLoads(workerAssignment, clusterMetrics);
        Comparator<WorkerCoordinator.WorkerLoad> workerLoadComparator = Comparator.comparing(w -> workerLoads.getOrDefault(w.worker(), 1.0) * workerTaskLoads.getOrDefault(w.worker(), 0.0));
        List<ConnectorTaskId> tasksWithLoad = tasks.stream().filter(clusterMetrics.taskLoadsToMap()::containsKey).collect(Collectors.toList());
        List<ConnectorTaskId> tasksWithNoLoad = tasks.stream().filter(t -> !clusterMetrics.taskLoadsToMap().containsKey(t)).collect(Collectors.toList());
        this.log.debug("Tasks with load::{}", tasksWithLoad);
        this.log.debug("Tasks with no load::{}", tasksWithNoLoad);
        tasksWithLoad.sort((t1, t2) -> {
            double t2Load;
            double t1Load = clusterMetrics.taskLoadsToMap().get(t1);
            return t1Load == (t2Load = clusterMetrics.taskLoadsToMap().get(t1).doubleValue()) ? t1.compareTo((ConnectorTaskId)t2) : Double.compare(t2Load, t1Load);
        });
        this.assignTasksToWorkers(workerAssignment, workerLoadComparator.thenComparing(WorkerCoordinator.WorkerLoad.taskComparator()), tasksWithLoad, clusterMetrics, workerTaskLoads, true);
        this.assignTasksToWorkers(workerAssignment, WorkerCoordinator.WorkerLoad.taskComparator(), tasksWithNoLoad, clusterMetrics, workerTaskLoads, false);
    }

    private void assignTasksToWorkers(List<WorkerCoordinator.WorkerLoad> workerAssignment, Comparator<WorkerCoordinator.WorkerLoad> workerLoadComparator, List<ConnectorTaskId> tasksToAssign, ConnectClusterMetrics clusterMetrics, Map<String, Double> workerTaskLoads, boolean updateLoad) {
        PriorityQueue<WorkerCoordinator.WorkerLoad> workersByLoad = new PriorityQueue<WorkerCoordinator.WorkerLoad>(workerLoadComparator);
        workersByLoad.addAll(workerAssignment);
        WorkerCoordinator.WorkerLoad blacklistedWorker = null;
        for (ConnectorTaskId task : tasksToAssign) {
            WorkerCoordinator.WorkerLoad leastLoadedWorker = workersByLoad.peek();
            if (updateLoad && this.tasksRevokedFromWorker.containsKey(task) && leastLoadedWorker.worker().equals(this.tasksRevokedFromWorker.get(task))) {
                blacklistedWorker = workersByLoad.poll();
            }
            leastLoadedWorker = workersByLoad.poll();
            this.log.debug("Least loaded worker::{}", (Object)leastLoadedWorker);
            if (updateLoad) {
                workerTaskLoads.put(leastLoadedWorker.worker(), workerTaskLoads.getOrDefault(leastLoadedWorker.worker(), 0.0) + clusterMetrics.taskLoadsToMap().getOrDefault(task, 0.0));
            }
            this.log.debug("Assigning task {} to {}", (Object)task, (Object)leastLoadedWorker.worker());
            leastLoadedWorker.assign(task);
            workersByLoad.add(leastLoadedWorker);
            if (blacklistedWorker == null) continue;
            workersByLoad.add(blacklistedWorker);
            blacklistedWorker = null;
        }
    }

    private Map<String, Double> workerTaskLoads(Collection<WorkerCoordinator.WorkerLoad> workerAssignment, ConnectClusterMetrics clusterMetrics) {
        Map<ConnectorTaskId, Double> taskLoads = clusterMetrics.taskLoadsToMap();
        HashMap<String, Double> workerTaskLoads = new HashMap<String, Double>();
        for (WorkerCoordinator.WorkerLoad workerLoad : workerAssignment) {
            double taskLoadOnWorker = workerLoad.tasks().stream().mapToDouble(task -> taskLoads.getOrDefault(task, 0.0)).sum();
            workerTaskLoads.put(workerLoad.worker(), taskLoadOnWorker);
        }
        return workerTaskLoads;
    }

    protected void assignTasks(List<WorkerCoordinator.WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks, Integer maxTasksPerWorker) {
        workerAssignment.sort(WorkerCoordinator.WorkerLoad.taskComparator());
        WorkerCoordinator.WorkerLoad first = workerAssignment.get(0);
        Iterator<ConnectorTaskId> load = tasks.iterator();
        block0: while (load.hasNext()) {
            int firstLoad = first.tasksSize();
            if (firstLoad >= maxTasksPerWorker) {
                this.log.debug("skipping task assignment due to max tasks per worker limit : {} ", (Object)maxTasksPerWorker);
                return;
            }
            int upTo = IntStream.range(0, workerAssignment.size()).filter(i -> ((WorkerCoordinator.WorkerLoad)workerAssignment.get(i)).tasksSize() > firstLoad).findFirst().orElse(workerAssignment.size());
            for (WorkerCoordinator.WorkerLoad worker : workerAssignment.subList(0, upTo)) {
                ConnectorTaskId task = load.next();
                this.log.debug("Assigning task {} to {}", (Object)task, (Object)worker.worker());
                worker.assign(task);
                if (load.hasNext()) continue;
                continue block0;
            }
        }
    }

    protected void assignTasks(List<WorkerCoordinator.WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks) {
        this.assignTasks(workerAssignment, tasks, Integer.MAX_VALUE);
    }

    private static List<WorkerCoordinator.WorkerLoad> workerAssignment(Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments, WorkerCoordinator.ConnectorsAndTasks toExclude) {
        WorkerCoordinator.ConnectorsAndTasks ignore = new WorkerCoordinator.ConnectorsAndTasks.Builder().with(toExclude.connectors(), toExclude.tasks()).build();
        return memberAssignments.entrySet().stream().map(e -> new WorkerCoordinator.WorkerLoad.Builder((String)e.getKey()).with(((WorkerCoordinator.ConnectorsAndTasks)e.getValue()).connectors().stream().filter(v -> !ignore.connectors().contains(v)).collect(Collectors.toList()), ((WorkerCoordinator.ConnectorsAndTasks)e.getValue()).tasks().stream().filter(v -> !ignore.tasks().contains(v)).collect(Collectors.toList())).build()).collect(Collectors.toList());
    }

    private static void addAll(Map<String, WorkerCoordinator.ConnectorsAndTasks.Builder> base, Map<String, WorkerCoordinator.ConnectorsAndTasks> toAdd) {
        toAdd.forEach((worker, assignment) -> base.computeIfAbsent((String)worker, w -> new WorkerCoordinator.ConnectorsAndTasks.Builder()).addAll((WorkerCoordinator.ConnectorsAndTasks)assignment));
    }

    private static <K> Map<K, WorkerCoordinator.ConnectorsAndTasks> buildAll(Map<K, WorkerCoordinator.ConnectorsAndTasks.Builder> builders) {
        return ConnectUtils.transformValues(builders, WorkerCoordinator.ConnectorsAndTasks.Builder::build);
    }

    private static List<WorkerCoordinator.WorkerLoad> workerLoads(Map<String, WorkerCoordinator.ConnectorsAndTasks> memberAssignments) {
        return memberAssignments.entrySet().stream().map(e -> new WorkerCoordinator.WorkerLoad.Builder((String)e.getKey()).with(((WorkerCoordinator.ConnectorsAndTasks)e.getValue()).connectors(), ((WorkerCoordinator.ConnectorsAndTasks)e.getValue()).tasks()).build()).collect(Collectors.toList());
    }

    private static void removeAll(List<WorkerCoordinator.WorkerLoad> workerLoads, Map<String, WorkerCoordinator.ConnectorsAndTasks> toRemove) {
        workerLoads.forEach(workerLoad -> {
            String worker = workerLoad.worker();
            WorkerCoordinator.ConnectorsAndTasks toRemoveFromWorker = toRemove.getOrDefault(worker, WorkerCoordinator.ConnectorsAndTasks.EMPTY);
            workerLoad.connectors().removeAll(toRemoveFromWorker.connectors());
            workerLoad.tasks().removeAll(toRemoveFromWorker.tasks());
        });
    }

    private static Map<String, WorkerCoordinator.ConnectorsAndTasks> intersection(WorkerCoordinator.ConnectorsAndTasks connectorsAndTasks, Map<String, WorkerCoordinator.ConnectorsAndTasks> assignments) {
        return ConnectUtils.transformValues(assignments, assignment -> {
            HashSet<String> connectors = new HashSet<String>(assignment.connectors());
            connectors.retainAll(connectorsAndTasks.connectors());
            HashSet<ConnectorTaskId> tasks = new HashSet<ConnectorTaskId>(assignment.tasks());
            tasks.retainAll(connectorsAndTasks.tasks());
            return new WorkerCoordinator.ConnectorsAndTasks.Builder().with(connectors, tasks).build();
        });
    }

    static class ClusterAssignment {
        private final Map<String, Collection<String>> newlyAssignedConnectors;
        private final Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks;
        private final Map<String, Collection<String>> newlyRevokedConnectors;
        private final Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks;
        private final Map<String, Collection<String>> allAssignedConnectors;
        private final Map<String, Collection<ConnectorTaskId>> allAssignedTasks;
        private final Set<String> allWorkers;
        public static final ClusterAssignment EMPTY = new ClusterAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());

        public ClusterAssignment(Map<String, Collection<String>> newlyAssignedConnectors, Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks, Map<String, Collection<String>> newlyRevokedConnectors, Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks, Map<String, Collection<String>> allAssignedConnectors, Map<String, Collection<ConnectorTaskId>> allAssignedTasks) {
            this.newlyAssignedConnectors = newlyAssignedConnectors;
            this.newlyAssignedTasks = newlyAssignedTasks;
            this.newlyRevokedConnectors = newlyRevokedConnectors;
            this.newlyRevokedTasks = newlyRevokedTasks;
            this.allAssignedConnectors = allAssignedConnectors;
            this.allAssignedTasks = allAssignedTasks;
            this.allWorkers = ConnectUtils.combineCollections(Arrays.asList(newlyAssignedConnectors, newlyAssignedTasks, newlyRevokedConnectors, newlyRevokedTasks, allAssignedConnectors, allAssignedTasks), Map::keySet, Collectors.toSet());
        }

        public Map<String, Collection<String>> newlyAssignedConnectors() {
            return this.newlyAssignedConnectors;
        }

        public Collection<String> newlyAssignedConnectors(String worker) {
            return this.newlyAssignedConnectors.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks() {
            return this.newlyAssignedTasks;
        }

        public Collection<ConnectorTaskId> newlyAssignedTasks(String worker) {
            return this.newlyAssignedTasks.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<String>> newlyRevokedConnectors() {
            return this.newlyRevokedConnectors;
        }

        public Collection<String> newlyRevokedConnectors(String worker) {
            return this.newlyRevokedConnectors.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks() {
            return this.newlyRevokedTasks;
        }

        public Collection<ConnectorTaskId> newlyRevokedTasks(String worker) {
            return this.newlyRevokedTasks.getOrDefault(worker, Collections.emptySet());
        }

        public Map<String, Collection<String>> allAssignedConnectors() {
            return this.allAssignedConnectors;
        }

        public Map<String, Collection<ConnectorTaskId>> allAssignedTasks() {
            return this.allAssignedTasks;
        }

        public Set<String> allWorkers() {
            return this.allWorkers;
        }

        public String toString() {
            return "ClusterAssignment{newlyAssignedConnectors=" + String.valueOf(this.newlyAssignedConnectors) + ", newlyAssignedTasks=" + String.valueOf(this.newlyAssignedTasks) + ", newlyRevokedConnectors=" + String.valueOf(this.newlyRevokedConnectors) + ", newlyRevokedTasks=" + String.valueOf(this.newlyRevokedTasks) + ", allAssignedConnectors=" + String.valueOf(this.allAssignedConnectors) + ", allAssignedTasks=" + String.valueOf(this.allAssignedTasks) + ", allWorkers=" + String.valueOf(this.allWorkers) + "}";
        }
    }

    static class AssignmentParams {
        String allocatedResourceName;
        Integer totalWorkers;
        Integer minAllocatedPerWorker;
        Integer workersToAllocateExtra;

        public AssignmentParams(String allocatedResourceName, int totalWorkers, int minAllocatedPerWorker, int workersToAllocateExtra) {
            this.allocatedResourceName = allocatedResourceName;
            this.totalWorkers = totalWorkers;
            this.minAllocatedPerWorker = minAllocatedPerWorker;
            this.workersToAllocateExtra = workersToAllocateExtra;
        }
    }
}

