/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.distributed.ConnectAssignor;
import org.apache.kafka.connect.runtime.distributed.ConnectClusterMetrics;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.EagerAssignor;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener;
import org.apache.kafka.connect.runtime.distributed.workermanager.WorkerResourceManager;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

public class WorkerCoordinator
extends AbstractCoordinator
implements Closeable {
    private final Logger log;
    private final String restUrl;
    private final ConfigBackingStore configStorage;
    private final DistributedConfig distributedConfig;
    private final ConnectMetrics connectMetrics;
    private final WorkerCoordinatorMetrics workerCoordinatorMetrics;
    private volatile ExtendedAssignment assignmentSnapshot;
    private ClusterConfigState configSnapshot;
    private final WorkerRebalanceListener listener;
    private final ConnectProtocolCompatibility protocolCompatibility;
    private LeaderState leaderState;
    private boolean rejoinRequested;
    private boolean preemptScheduledRebalance;
    private volatile ConnectProtocolCompatibility currentConnectProtocol;
    private volatile int lastCompletedGenerationId;
    private final ConnectAssignor eagerAssignor;
    private final ConnectAssignor incrementalAssignor;
    private final int coordinatorDiscoveryTimeoutMs;
    private WorkerResourceManager workerRM = null;
    private final LogContext logContext;
    private final AtomicLong workerPollTimeoutCount = new AtomicLong(0L);
    boolean revokeTasksDueToPollTimeout = false;

    public WorkerCoordinator(GroupRebalanceConfig config, LogContext logContext, ConsumerNetworkClient client, Metrics metrics, String metricGrpPrefix, Time time, String restUrl, ConfigBackingStore configStorage, WorkerRebalanceListener listener, ConnectProtocolCompatibility protocolCompatibility, int maxDelay, boolean preemptiveScheduledRebalanceEndEnabled, DistributedConfig distributedConfig, ConnectMetrics connectMetrics) {
        super(config, logContext, client, metrics, metricGrpPrefix, time);
        this.log = logContext.logger(WorkerCoordinator.class);
        this.restUrl = restUrl;
        this.configStorage = configStorage;
        this.assignmentSnapshot = null;
        this.workerCoordinatorMetrics = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
        this.listener = listener;
        this.rejoinRequested = false;
        this.protocolCompatibility = protocolCompatibility;
        this.incrementalAssignor = new IncrementalCooperativeAssignor(logContext, time, maxDelay, preemptiveScheduledRebalanceEndEnabled, distributedConfig.getBoolean("confluent.connect.resource.aware.scheduling.enable"));
        this.eagerAssignor = new EagerAssignor(logContext);
        this.currentConnectProtocol = protocolCompatibility;
        this.coordinatorDiscoveryTimeoutMs = config.heartbeatIntervalMs;
        this.lastCompletedGenerationId = AbstractCoordinator.Generation.NO_GENERATION.generationId;
        this.distributedConfig = distributedConfig;
        this.logContext = logContext;
        this.connectMetrics = connectMetrics;
    }

    public void requestRejoin(String reason) {
        this.log.debug("Request joining group due to: {}", (Object)reason);
        this.rejoinRequested = true;
    }

    public void requestRejoin(boolean preemptScheduledRebalance) {
        this.preemptScheduledRebalance = preemptScheduledRebalance;
        this.requestRejoin("Rebalance Triggered Externally");
    }

    public String protocolType() {
        return "connect";
    }

    protected synchronized boolean ensureCoordinatorReady(Timer timer) {
        return super.ensureCoordinatorReady(timer);
    }

    public void poll(long timeout, Supplier<Utils.UncheckedCloseable> onPoll) {
        long elapsed;
        long remaining;
        long start;
        long now = start = this.time.milliseconds();
        do {
            if (this.coordinatorUnknown()) {
                boolean coordinatorReady;
                this.log.debug("Broker coordinator is marked unknown. Attempting discovery with a timeout of {}ms", (Object)this.coordinatorDiscoveryTimeoutMs);
                try (Utils.UncheckedCloseable polling = onPoll.get();){
                    coordinatorReady = this.ensureCoordinatorReady(this.time.timer((long)this.coordinatorDiscoveryTimeoutMs));
                }
                if (coordinatorReady) {
                    this.log.debug("Broker coordinator is ready");
                } else {
                    this.log.debug("Can not connect to broker coordinator");
                    ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
                    if (localAssignmentSnapshot != null && !localAssignmentSnapshot.failed()) {
                        this.log.info("Broker coordinator was unreachable for {}ms. Revoking previous assignment {} to avoid running tasks while not being a member the group", (Object)this.coordinatorDiscoveryTimeoutMs, (Object)localAssignmentSnapshot);
                        this.listener.onRevoked(localAssignmentSnapshot.leader(), localAssignmentSnapshot.connectors(), localAssignmentSnapshot.tasks());
                        this.assignmentSnapshot = null;
                    }
                }
                now = this.time.milliseconds();
            }
            if (this.rejoinNeededOrPending()) {
                try (Utils.UncheckedCloseable polling = onPoll.get();){
                    this.ensureActiveGroup();
                }
                now = this.time.milliseconds();
            }
            this.pollHeartbeat(now);
            elapsed = now - start;
            remaining = timeout - elapsed;
            long pollTimeout = Math.min(Math.max(0L, remaining), this.timeToNextHeartbeat(now));
            try (Utils.UncheckedCloseable polling = onPoll.get();){
                this.client.poll(this.time.timer(pollTimeout));
            }
        } while ((remaining = timeout - (elapsed = (now = this.time.milliseconds()) - start)) > 0L);
    }

    public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
        this.configSnapshot = this.configStorage.snapshot();
        ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
        ExtendedWorkerState workerState = new ExtendedWorkerState(this.restUrl, this.configSnapshot.offset(), localAssignmentSnapshot);
        switch (this.protocolCompatibility) {
            case EAGER: {
                return ConnectProtocol.metadataRequest(workerState);
            }
            case COMPATIBLE: {
                return IncrementalCooperativeConnectProtocol.metadataRequest(workerState, false);
            }
            case SESSIONED: {
                return IncrementalCooperativeConnectProtocol.metadataRequest(workerState, true);
            }
        }
        throw new IllegalStateException("Unknown Connect protocol compatibility mode " + String.valueOf((Object)this.protocolCompatibility));
    }

    protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
        ExtendedAssignment newAssignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(memberAssignment);
        this.log.debug("Deserialized new assignment: {}", (Object)newAssignment);
        this.currentConnectProtocol = ConnectProtocolCompatibility.fromProtocol(protocol);
        this.rejoinRequested = false;
        if (this.currentConnectProtocol != ConnectProtocolCompatibility.EAGER) {
            ExtendedAssignment localAssignmentSnapshot;
            if (!newAssignment.revokedConnectors().isEmpty() || !newAssignment.revokedTasks().isEmpty()) {
                this.listener.onRevoked(newAssignment.leader(), newAssignment.revokedConnectors(), newAssignment.revokedTasks());
            }
            if ((localAssignmentSnapshot = this.assignmentSnapshot) != null) {
                localAssignmentSnapshot.connectors().removeAll(newAssignment.revokedConnectors());
                localAssignmentSnapshot.tasks().removeAll(newAssignment.revokedTasks());
                this.log.debug("After revocations snapshot of assignment: {}", (Object)localAssignmentSnapshot);
                newAssignment.connectors().addAll(localAssignmentSnapshot.connectors());
                newAssignment.tasks().addAll(localAssignmentSnapshot.tasks());
            }
            this.log.debug("Augmented new assignment: {}", (Object)newAssignment);
            this.preemptScheduledRebalance = false;
        }
        this.assignmentSnapshot = newAssignment;
        this.lastCompletedGenerationId = generation;
        if (!this.isLeader() && this.workerRM != null) {
            try {
                this.workerRM.stop();
                this.workerRM = null;
            }
            catch (Exception e) {
                this.log.error("Failed to stop resource manager on leader change", (Throwable)e);
                throw new RuntimeException(e);
            }
        }
        this.listener.onAssigned(newAssignment, generation);
    }

    protected Map<String, ByteBuffer> onLeaderElected(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata, boolean skipAssignment) {
        Map<String, ByteBuffer> computedAssignments;
        if (skipAssignment) {
            throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");
        }
        Map<String, ByteBuffer> map = computedAssignments = ConnectProtocolCompatibility.fromProtocol(protocol) == ConnectProtocolCompatibility.EAGER ? this.eagerAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this) : this.incrementalAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this);
        if (ConnectProtocolCompatibility.fromProtocol(protocol) == ConnectProtocolCompatibility.EAGER) {
            return computedAssignments;
        }
        this.maybeStartWorkerResourceManager();
        return computedAssignments;
    }

    protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
        this.log.info("Rebalance started");
        this.leaderState(null);
        ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
        if (this.currentConnectProtocol == ConnectProtocolCompatibility.EAGER || this.revokeTasksDueToPollTimeout) {
            this.log.debug("Revoking previous assignment {}", (Object)localAssignmentSnapshot);
            if (localAssignmentSnapshot != null && !localAssignmentSnapshot.failed()) {
                this.listener.onRevoked(localAssignmentSnapshot.leader(), localAssignmentSnapshot.connectors(), localAssignmentSnapshot.tasks());
                this.revokeTasksDueToPollTimeout = false;
            }
        } else {
            this.log.debug("Cooperative rebalance triggered. Keeping assignment {} until it's explicitly revoked.", (Object)localAssignmentSnapshot);
            if (this.preemptScheduledRebalance && this.isLeader() && ((IncrementalCooperativeAssignor)this.incrementalAssignor).preemptScheduledRebalanceDelay()) {
                this.log.info("Lost Worker(s) detected and pre-emptive end to scheduled rebalance delay requested. The next round of rebalance will assign resources immediately for them.");
            }
        }
        return true;
    }

    protected boolean rejoinNeededOrPending() {
        ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
        return super.rejoinNeededOrPending() || localAssignmentSnapshot == null || localAssignmentSnapshot.failed() || this.rejoinRequested;
    }

    public String memberId() {
        AbstractCoordinator.Generation generation = this.generationIfStable();
        if (generation != null) {
            return generation.memberId;
        }
        return "";
    }

    protected void handlePollTimeoutExpiry() {
        this.listener.onPollTimeoutExpiry();
        this.workerCoordinatorMetrics.incrementPollTimeoutCount();
        this.revokeTasksDueToPollTimeout = true;
        this.maybeLeaveGroup("worker poll timeout has expired.");
    }

    public int generationId() {
        return super.generation().generationId;
    }

    public int lastCompletedGenerationId() {
        return this.lastCompletedGenerationId;
    }

    public void revokeAssignment(ExtendedAssignment assignment) {
        this.listener.onRevoked(assignment.leader(), assignment.connectors(), assignment.tasks());
    }

    private boolean isLeader() {
        ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
        return localAssignmentSnapshot != null && this.memberId().equals(localAssignmentSnapshot.leader());
    }

    public String ownerUrl(String connector) {
        if (this.rejoinNeededOrPending() || !this.isLeader()) {
            return null;
        }
        return this.leaderState().ownerUrl(connector);
    }

    public String ownerUrl(ConnectorTaskId task) {
        if (this.rejoinNeededOrPending() || !this.isLeader()) {
            return null;
        }
        return this.leaderState().ownerUrl(task);
    }

    public ClusterConfigState configFreshSnapshot() {
        return this.configStorage.snapshot();
    }

    public ClusterConfigState configSnapshot() {
        return this.configSnapshot;
    }

    public ConnectClusterMetrics metricsSnapshot() {
        if (this.workerRM != null) {
            return this.workerRM.snapshot();
        }
        return new ConnectClusterMetrics(Collections.emptyList(), Collections.emptyList());
    }

    public boolean isAnyClusterWorkerLoadHigh() {
        if (this.workerRM != null) {
            return this.workerRM.IsAnyWorkerLoadHigh();
        }
        return false;
    }

    private void maybeStartWorkerResourceManager() {
        if (this.distributedConfig.getBoolean("confluent.connect.resource.aware.scheduling.enable").booleanValue() && this.workerRM == null) {
            this.workerRM = new WorkerResourceManager(this.logContext, this, this.distributedConfig, this.time, this.connectMetrics);
            try {
                this.workerRM.start();
            }
            catch (Exception e) {
                this.log.error("Failed to start WorkerResourceManager error {}, task assignment would be round robin", (Throwable)e);
            }
        }
        if (this.workerRM != null) {
            this.updateWorkerResourceManagerAssignment();
        }
    }

    private void updateWorkerResourceManagerAssignment() {
        Map<String, Collection<ConnectorTaskId>> currentAssignment;
        ConnectorsAndTasks previousRevoked = ((IncrementalCooperativeAssignor)this.incrementalAssignor).getPreviousRevocation();
        if (previousRevoked.isEmpty() && !(currentAssignment = ((IncrementalCooperativeAssignor)this.incrementalAssignor).getNextAssignment()).isEmpty()) {
            Map<String, List<ConnectorTaskId>> latestTaskAssignment = currentAssignment.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new ArrayList((Collection)entry.getValue())));
            this.workerRM.setLatestTaskAssignment(latestTaskAssignment);
        }
    }

    public void configSnapshot(ClusterConfigState update) {
        this.configSnapshot = update;
    }

    private LeaderState leaderState() {
        return this.leaderState;
    }

    public void leaderState(LeaderState update) {
        this.leaderState = update;
    }

    public short currentProtocolVersion() {
        return this.currentConnectProtocol.protocolVersion();
    }

    public static <K, V> Map<V, K> invertAssignment(Map<K, Collection<V>> assignment) {
        HashMap<V, K> inverted = new HashMap<V, K>();
        for (Map.Entry<K, Collection<V>> assignmentEntry : assignment.entrySet()) {
            K key = assignmentEntry.getKey();
            for (V value : assignmentEntry.getValue()) {
                inverted.put(value, key);
            }
        }
        return inverted;
    }

    private class WorkerCoordinatorMetrics {
        public final String metricGrpName;

        public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            Measurable numConnectors = (config, now) -> {
                ExtendedAssignment localAssignmentSnapshot = WorkerCoordinator.this.assignmentSnapshot;
                if (localAssignmentSnapshot == null) {
                    return 0.0;
                }
                return localAssignmentSnapshot.connectors().size();
            };
            Measurable numTasks = (config, now) -> {
                ExtendedAssignment localAssignmentSnapshot = WorkerCoordinator.this.assignmentSnapshot;
                if (localAssignmentSnapshot == null) {
                    return 0.0;
                }
                return localAssignmentSnapshot.tasks().size();
            };
            Measurable pollTimeoutCount = (config, now) -> WorkerCoordinator.this.workerPollTimeoutCount.get();
            metrics.addMetric(metrics.metricName("assigned-connectors", this.metricGrpName, "The number of connector instances currently assigned to this worker"), numConnectors);
            metrics.addMetric(metrics.metricName("assigned-tasks", this.metricGrpName, "The number of tasks currently assigned to this worker"), numTasks);
            metrics.addMetric(metrics.metricName("worker-poll-timeout-count", this.metricGrpName, "The number of times poll timeout has occurred"), pollTimeoutCount);
        }

        public void incrementPollTimeoutCount() {
            WorkerCoordinator.this.workerPollTimeoutCount.incrementAndGet();
        }
    }

    public static class LeaderState {
        private final Map<String, ExtendedWorkerState> allMembers;
        private final Map<String, String> connectorOwners;
        private final Map<ConnectorTaskId, String> taskOwners;

        public LeaderState(Map<String, ExtendedWorkerState> allMembers, Map<String, Collection<String>> connectorAssignment, Map<String, Collection<ConnectorTaskId>> taskAssignment) {
            this.allMembers = allMembers;
            this.connectorOwners = WorkerCoordinator.invertAssignment(connectorAssignment);
            this.taskOwners = WorkerCoordinator.invertAssignment(taskAssignment);
        }

        private String ownerUrl(ConnectorTaskId id) {
            String ownerId = this.taskOwners.get(id);
            if (ownerId == null) {
                return null;
            }
            return this.allMembers.get(ownerId).url();
        }

        private String ownerUrl(String connector) {
            String ownerId = this.connectorOwners.get(connector);
            if (ownerId == null) {
                return null;
            }
            return this.allMembers.get(ownerId).url();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof LeaderState)) {
                return false;
            }
            LeaderState that = (LeaderState)o;
            return Objects.equals(this.allMembers, that.allMembers) && Objects.equals(this.connectorOwners, that.connectorOwners) && Objects.equals(this.taskOwners, that.taskOwners);
        }

        public int hashCode() {
            return Objects.hash(this.allMembers, this.connectorOwners, this.taskOwners);
        }

        public String toString() {
            return "LeaderState{allMembers=" + String.valueOf(this.allMembers) + ", connectorOwners=" + String.valueOf(this.connectorOwners) + ", taskOwners=" + String.valueOf(this.taskOwners) + "}";
        }
    }

    public static class ConnectorsAndTasks {
        public static final ConnectorsAndTasks EMPTY = new ConnectorsAndTasks(Collections.emptyList(), Collections.emptyList());
        private final Collection<String> connectors;
        private final Collection<ConnectorTaskId> tasks;

        private ConnectorsAndTasks(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
            this.connectors = connectors;
            this.tasks = tasks;
        }

        public Collection<String> connectors() {
            return this.connectors;
        }

        public Collection<ConnectorTaskId> tasks() {
            return this.tasks;
        }

        public int size() {
            return this.connectors.size() + this.tasks.size();
        }

        public boolean isEmpty() {
            return this.connectors.isEmpty() && this.tasks.isEmpty();
        }

        public String toString() {
            return "{ connectorIds=" + String.valueOf(this.connectors) + ", taskIds=" + String.valueOf(this.tasks) + "}";
        }

        public static class Builder {
            private Set<String> withConnectors = new LinkedHashSet<String>();
            private Set<ConnectorTaskId> withTasks = new LinkedHashSet<ConnectorTaskId>();

            public Builder with(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
                this.withConnectors = new LinkedHashSet<String>(connectors);
                this.withTasks = new LinkedHashSet<ConnectorTaskId>(tasks);
                return this;
            }

            public Builder addConnectors(Collection<String> connectors) {
                this.withConnectors.addAll(connectors);
                return this;
            }

            public Builder addTasks(Collection<ConnectorTaskId> tasks) {
                this.withTasks.addAll(tasks);
                return this;
            }

            public Builder addAll(ConnectorsAndTasks connectorsAndTasks) {
                return this.addConnectors(connectorsAndTasks.connectors()).addTasks(connectorsAndTasks.tasks());
            }

            public ConnectorsAndTasks build() {
                return new ConnectorsAndTasks(this.withConnectors, this.withTasks);
            }
        }
    }

    public static class WorkerLoad {
        private final String worker;
        private final Collection<String> connectors;
        private final Collection<ConnectorTaskId> tasks;

        private WorkerLoad(String worker, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
            this.worker = worker;
            this.connectors = connectors;
            this.tasks = tasks;
        }

        public String worker() {
            return this.worker;
        }

        public Collection<String> connectors() {
            return this.connectors;
        }

        public Collection<ConnectorTaskId> tasks() {
            return this.tasks;
        }

        public int connectorsSize() {
            return this.connectors.size();
        }

        public int tasksSize() {
            return this.tasks.size();
        }

        public void assign(String connector) {
            this.connectors.add(connector);
        }

        public void assign(ConnectorTaskId task) {
            this.tasks.add(task);
        }

        public int size() {
            return this.connectors.size() + this.tasks.size();
        }

        public boolean isEmpty() {
            return this.connectors.isEmpty() && this.tasks.isEmpty();
        }

        public static Comparator<WorkerLoad> connectorComparator() {
            return (left, right) -> {
                int res = left.connectors.size() - right.connectors.size();
                return res != 0 ? res : (left.worker == null ? (right.worker == null ? 0 : -1) : left.worker.compareTo(right.worker));
            };
        }

        public static Comparator<WorkerLoad> taskComparator() {
            return (left, right) -> {
                int res = left.tasks.size() - right.tasks.size();
                return res != 0 ? res : (left.worker == null ? (right.worker == null ? 0 : -1) : left.worker.compareTo(right.worker));
            };
        }

        public String toString() {
            return "{ worker=" + this.worker + ", connectorIds=" + String.valueOf(this.connectors) + ", taskIds=" + String.valueOf(this.tasks) + "}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof WorkerLoad)) {
                return false;
            }
            WorkerLoad that = (WorkerLoad)o;
            return this.worker.equals(that.worker) && this.connectors.equals(that.connectors) && this.tasks.equals(that.tasks);
        }

        public int hashCode() {
            return Objects.hash(this.worker, this.connectors, this.tasks);
        }

        public static class Builder {
            private final String withWorker;
            private Collection<String> withConnectors;
            private Collection<ConnectorTaskId> withTasks;

            public Builder(String worker) {
                this.withWorker = Objects.requireNonNull(worker, "worker cannot be null");
            }

            public Builder withCopies(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
                this.withConnectors = new ArrayList<String>(Objects.requireNonNull(connectors, "connectors may be empty but not null"));
                this.withTasks = new ArrayList<ConnectorTaskId>(Objects.requireNonNull(tasks, "tasks may be empty but not null"));
                return this;
            }

            public Builder with(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
                this.withConnectors = Objects.requireNonNull(connectors, "connectors may be empty but not null");
                this.withTasks = Objects.requireNonNull(tasks, "tasks may be empty but not null");
                return this;
            }

            public WorkerLoad build() {
                return new WorkerLoad(this.withWorker, this.withConnectors != null ? this.withConnectors : new ArrayList<String>(), this.withTasks != null ? this.withTasks : new ArrayList<ConnectorTaskId>());
            }
        }
    }
}

