/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.WorkerGroupMember;
import org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedHerder
implements Herder,
Runnable {
    private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250L;
    private final Worker worker;
    private final KafkaConfigStorage configStorage;
    private ClusterConfigState configState;
    private final Time time;
    private final int workerSyncTimeoutMs;
    private final int workerUnsyncBackoffMs;
    private final WorkerGroupMember member;
    private final AtomicBoolean stopping;
    private final CountDownLatch stopLatch = new CountDownLatch(1);
    private boolean rebalanceResolved;
    private ConnectProtocol.Assignment assignment;
    private final Queue<HerderRequest> requests = new PriorityQueue<HerderRequest>();
    private Set<String> connectorConfigUpdates = new HashSet<String>();
    private boolean needsReconfigRebalance;
    private final ExecutorService forwardRequestExecutor;

    public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) {
        this(config, worker, null, null, restUrl, (Time)new SystemTime());
    }

    public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, Time time) {
        this.worker = worker;
        if (configStorage != null) {
            this.configStorage = configStorage;
        } else {
            this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), this.connectorConfigCallback(), this.taskConfigCallback());
            this.configStorage.configure(config.originals());
        }
        this.configState = ClusterConfigState.EMPTY;
        this.time = time;
        this.workerSyncTimeoutMs = config.getInt("worker.sync.timeout.ms");
        this.workerUnsyncBackoffMs = config.getInt("worker.unsync.backoff.ms");
        this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, this.rebalanceListener());
        this.stopping = new AtomicBoolean(false);
        this.rebalanceResolved = true;
        this.needsReconfigRebalance = false;
        this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
    }

    @Override
    public void start() {
        Thread thread = new Thread((Runnable)this, "DistributedHerder");
        thread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            log.info("Herder starting");
            this.configStorage.start();
            log.info("Herder started");
            while (!this.stopping.get()) {
                this.tick();
            }
            this.halt();
            log.info("Herder stopped");
        }
        catch (Throwable t) {
            log.error("Uncaught exception in herder work thread, exiting: ", t);
            this.stopLatch.countDown();
            System.exit(1);
        }
        finally {
            this.stopLatch.countDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void tick() {
        try {
            this.member.ensureActive();
            if (!this.handleRebalanceCompleted()) {
                return;
            }
        }
        catch (WakeupException e) {
            return;
        }
        long now = this.time.milliseconds();
        long nextRequestTimeoutMs = Long.MAX_VALUE;
        while (true) {
            HerderRequest next;
            DistributedHerder distributedHerder = this;
            synchronized (distributedHerder) {
                next = this.requests.peek();
                if (next == null) {
                    break;
                }
                if (now < next.at) {
                    nextRequestTimeoutMs = next.at - now;
                    break;
                }
                this.requests.poll();
            }
            try {
                next.action().call();
                next.callback().onCompletion(null, null);
            }
            catch (Throwable t) {
                next.callback().onCompletion(t, null);
            }
        }
        Set<String> connectorConfigUpdatesCopy = null;
        DistributedHerder t = this;
        synchronized (t) {
            if (this.needsReconfigRebalance || !this.connectorConfigUpdates.isEmpty()) {
                ClusterConfigState newConfigState = this.configStorage.snapshot();
                if (!newConfigState.connectors().equals(this.configState.connectors())) {
                    this.needsReconfigRebalance = true;
                }
                this.configState = newConfigState;
                if (this.needsReconfigRebalance) {
                    this.member.requestRejoin();
                    this.connectorConfigUpdates.clear();
                    this.needsReconfigRebalance = false;
                    return;
                }
                if (!this.connectorConfigUpdates.isEmpty()) {
                    connectorConfigUpdatesCopy = this.connectorConfigUpdates;
                    this.connectorConfigUpdates = new HashSet<String>();
                }
            }
        }
        if (connectorConfigUpdatesCopy != null) {
            HashSet<String> localConnectors = this.assignment == null ? Collections.emptySet() : new HashSet<String>(this.assignment.connectors());
            for (String connectorName : connectorConfigUpdatesCopy) {
                if (!localConnectors.contains(connectorName)) continue;
                boolean remains = this.configState.connectors().contains(connectorName);
                log.info("Handling connector-only config update by {} connector {}", (Object)(remains ? "restarting" : "stopping"), (Object)connectorName);
                this.worker.stopConnector(connectorName);
                if (!remains) continue;
                this.startConnector(connectorName);
            }
        }
        try {
            this.member.poll(nextRequestTimeoutMs);
            if (!this.handleRebalanceCompleted()) {
                return;
            }
        }
        catch (WakeupException e) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void halt() {
        DistributedHerder distributedHerder = this;
        synchronized (distributedHerder) {
            log.info("Stopping connectors and tasks that are still assigned to this worker.");
            for (String connName : new HashSet<String>(this.worker.connectorNames())) {
                try {
                    this.worker.stopConnector(connName);
                }
                catch (Throwable t) {
                    log.error("Failed to shut down connector " + connName, t);
                }
            }
            for (ConnectorTaskId taskId : new HashSet<ConnectorTaskId>(this.worker.taskIds())) {
                try {
                    this.worker.stopTask(taskId);
                }
                catch (Throwable t) {
                    log.error("Failed to shut down task " + taskId, t);
                }
            }
            this.member.stop();
            while (!this.requests.isEmpty()) {
                HerderRequest request = this.requests.poll();
                request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
            }
            if (this.configStorage != null) {
                this.configStorage.stop();
            }
        }
    }

    @Override
    public void stop() {
        log.info("Herder stopping");
        this.stopping.set(true);
        this.member.wakeup();
        while (this.stopLatch.getCount() > 0L) {
            try {
                this.stopLatch.await();
            }
            catch (InterruptedException e) {}
        }
        this.forwardRequestExecutor.shutdown();
        try {
            if (!this.forwardRequestExecutor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                this.forwardRequestExecutor.shutdownNow();
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        log.info("Herder stopped");
    }

    @Override
    public synchronized void connectors(final Callback<Collection<String>> callback) {
        log.trace("Submitting connector listing request");
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (!DistributedHerder.this.checkConfigSynced(callback)) {
                    return null;
                }
                callback.onCompletion(null, DistributedHerder.this.configState.connectors());
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) {
        log.trace("Submitting connector info request {}", (Object)connName);
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (!DistributedHerder.this.checkConfigSynced(callback)) {
                    return null;
                }
                if (!DistributedHerder.this.configState.connectors().contains(connName)) {
                    callback.onCompletion((Throwable)new NotFoundException("Connector " + connName + " not found"), null);
                } else {
                    callback.onCompletion(null, new ConnectorInfo(connName, DistributedHerder.this.configState.connectorConfig(connName), DistributedHerder.this.configState.tasks(connName)));
                }
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public void connectorConfig(String connName, final Callback<Map<String, String>> callback) {
        log.trace("Submitting connector config read request {}", (Object)connName);
        this.connectorInfo(connName, new Callback<ConnectorInfo>(){

            @Override
            public void onCompletion(Throwable error, ConnectorInfo result) {
                if (error != null) {
                    callback.onCompletion(error, null);
                } else {
                    callback.onCompletion(null, result.config());
                }
            }
        });
    }

    @Override
    public void putConnectorConfig(final String connName, Map<String, String> config, final boolean allowReplace, final Callback<Herder.Created<ConnectorInfo>> callback) {
        Map<String, String> connConfig;
        if (config == null) {
            connConfig = null;
        } else if (!config.containsKey("name")) {
            connConfig = new HashMap<String, String>(config);
            connConfig.put("name", connName);
        } else {
            connConfig = config;
        }
        log.trace("Submitting connector config write request {}", (Object)connName);
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                log.trace("Handling connector config request {}", (Object)connName);
                if (!DistributedHerder.this.isLeader()) {
                    callback.onCompletion((Throwable)((Object)new NotLeaderException("Only the leader can set connector configs.", DistributedHerder.this.leaderUrl())), null);
                    return null;
                }
                boolean exists = DistributedHerder.this.configState.connectors().contains(connName);
                if (!allowReplace && exists) {
                    callback.onCompletion((Throwable)new AlreadyExistsException("Connector " + connName + " already exists"), null);
                    return null;
                }
                if (connConfig == null && !exists) {
                    callback.onCompletion((Throwable)new NotFoundException("Connector " + connName + " not found"), null);
                    return null;
                }
                log.trace("Submitting connector config {} {} {}", new Object[]{connName, allowReplace, DistributedHerder.this.configState.connectors()});
                DistributedHerder.this.configStorage.putConnectorConfig(connName, connConfig);
                boolean created = !exists && connConfig != null;
                ConnectorInfo info = connConfig == null ? null : new ConnectorInfo(connName, connConfig, DistributedHerder.this.configState.tasks(connName));
                callback.onCompletion(null, new Herder.Created<ConnectorInfo>(created, info));
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public synchronized void requestTaskReconfiguration(final String connName) {
        log.trace("Submitting connector task reconfiguration request {}", (Object)connName);
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                DistributedHerder.this.reconfigureConnectorTasksWithRetry(connName);
                return null;
            }
        }, new Callback<Void>(){

            @Override
            public void onCompletion(Throwable error, Void result) {
                log.error("Unexpected error during task reconfiguration: ", error);
                log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", (Object)connName);
            }
        });
    }

    @Override
    public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) {
        log.trace("Submitting get task configuration request {}", (Object)connName);
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (!DistributedHerder.this.checkConfigSynced(callback)) {
                    return null;
                }
                if (!DistributedHerder.this.configState.connectors().contains(connName)) {
                    callback.onCompletion((Throwable)new NotFoundException("Connector " + connName + " not found"), null);
                } else {
                    ArrayList<TaskInfo> result = new ArrayList<TaskInfo>();
                    for (int i = 0; i < DistributedHerder.this.configState.taskCount(connName); ++i) {
                        ConnectorTaskId id = new ConnectorTaskId(connName, i);
                        result.add(new TaskInfo(id, DistributedHerder.this.configState.taskConfig(id)));
                    }
                    callback.onCompletion(null, result);
                }
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public synchronized void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) {
        log.trace("Submitting put task configuration request {}", (Object)connName);
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (!DistributedHerder.this.isLeader()) {
                    callback.onCompletion((Throwable)((Object)new NotLeaderException("Only the leader may write task configurations.", DistributedHerder.this.leaderUrl())), null);
                } else if (!DistributedHerder.this.configState.connectors().contains(connName)) {
                    callback.onCompletion((Throwable)new NotFoundException("Connector " + connName + " not found"), null);
                } else {
                    DistributedHerder.this.configStorage.putTaskConfigs(DistributedHerder.taskConfigListAsMap(connName, configs));
                    callback.onCompletion(null, null);
                }
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    private boolean isLeader() {
        return this.assignment != null && this.member.memberId().equals(this.assignment.leader());
    }

    private String leaderUrl() {
        if (this.assignment == null) {
            return null;
        }
        return this.assignment.leaderUrl();
    }

    private boolean handleRebalanceCompleted() {
        if (this.rebalanceResolved) {
            return true;
        }
        boolean needsReadToEnd = false;
        long syncConfigsTimeoutMs = Long.MAX_VALUE;
        boolean needsRejoin = false;
        if (this.assignment.failed()) {
            needsRejoin = true;
            if (this.isLeader()) {
                log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying.");
                needsReadToEnd = true;
                syncConfigsTimeoutMs = this.workerSyncTimeoutMs;
            } else if (this.configState.offset() < this.assignment.offset()) {
                log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying.");
                needsReadToEnd = true;
            } else {
                log.warn("Join group completed, but assignment failed. We were up to date, so just retrying.");
            }
        } else if (this.configState.offset() < this.assignment.offset()) {
            log.warn("Catching up to assignment's config offset.");
            needsReadToEnd = true;
        }
        if (needsReadToEnd && !this.readConfigToEnd(syncConfigsTimeoutMs)) {
            needsRejoin = true;
        }
        if (needsRejoin) {
            this.member.requestRejoin();
            return false;
        }
        if (this.configState.offset() != this.assignment.offset()) {
            log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", (Object)this.configState.offset(), (Object)this.assignment.offset());
            this.member.requestRejoin();
            return false;
        }
        this.startWork();
        this.rebalanceResolved = true;
        return true;
    }

    private boolean readConfigToEnd(long timeoutMs) {
        log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", (Object)this.configState.offset(), (Object)this.assignment.offset());
        try {
            this.configStorage.readToEnd().get(timeoutMs, TimeUnit.MILLISECONDS);
            this.configState = this.configStorage.snapshot();
            log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", (Object)this.configState.offset());
            return true;
        }
        catch (TimeoutException e) {
            log.warn("Didn't reach end of config log quickly enough", (Throwable)e);
            if (this.isLeader()) {
                this.backoff(this.workerUnsyncBackoffMs);
            }
            return false;
        }
        catch (InterruptedException | ExecutionException e) {
            throw new ConnectException("Error trying to catch up after assignment", (Throwable)e);
        }
    }

    private void backoff(long ms) {
        Utils.sleep((long)ms);
    }

    private void startWork() {
        log.info("Starting connectors and tasks using config offset {}", (Object)this.assignment.offset());
        for (String connectorName : this.assignment.connectors()) {
            try {
                this.startConnector(connectorName);
            }
            catch (ConfigException e) {
                log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " + "configuration. This connector will not execute until reconfigured.", (Throwable)e);
            }
        }
        for (ConnectorTaskId taskId : this.assignment.tasks()) {
            try {
                log.info("Starting task {}", (Object)taskId);
                Map<String, String> configs = this.configState.taskConfig(taskId);
                TaskConfig taskConfig = new TaskConfig(configs);
                this.worker.addTask(taskId, taskConfig);
            }
            catch (ConfigException e) {
                log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " + "configuration. This task will not execute until reconfigured.", (Throwable)e);
            }
        }
        log.info("Finished starting connectors and tasks");
    }

    private void startConnector(String connectorName) {
        log.info("Starting connector {}", (Object)connectorName);
        Map<String, String> configs = this.configState.connectorConfig(connectorName);
        ConnectorConfig connConfig = new ConnectorConfig(configs);
        String connName = connConfig.getString("name");
        HerderConnectorContext ctx = new HerderConnectorContext(this, connName);
        this.worker.addConnector(connConfig, ctx);
        this.reconfigureConnectorTasksWithRetry(connName);
    }

    private void reconfigureConnectorTasksWithRetry(final String connName) {
        this.reconfigureConnector(connName, new Callback<Void>(){

            @Override
            public void onCompletion(Throwable error, Void result) {
                if (error != null) {
                    log.error("Failed to reconfigure connector's tasks, retrying after backoff:", error);
                    DistributedHerder.this.addRequest(250L, new Callable<Void>(){

                        @Override
                        public Void call() throws Exception {
                            DistributedHerder.this.reconfigureConnectorTasksWithRetry(connName);
                            return null;
                        }
                    }, new Callback<Void>(){

                        @Override
                        public void onCompletion(Throwable error, Void result) {
                            log.error("Unexpected error during connector task reconfiguration: ", error);
                            log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", (Object)connName);
                        }
                    });
                }
            }
        });
    }

    private void reconfigureConnector(final String connName, final Callback<Void> cb) {
        try {
            Map<String, String> configs = this.configState.connectorConfig(connName);
            ConnectorConfig connConfig = new ConnectorConfig(configs);
            List sinkTopics = null;
            if (SinkConnector.class.isAssignableFrom(connConfig.getClass("connector.class"))) {
                sinkTopics = connConfig.getList("topics");
            }
            final List<Map<String, String>> taskProps = this.worker.connectorTaskConfigs(connName, connConfig.getInt("tasks.max"), sinkTopics);
            boolean changed = false;
            int currentNumTasks = this.configState.taskCount(connName);
            if (taskProps.size() != currentNumTasks) {
                log.debug("Change in connector task count from {} to {}, writing updated task configurations", (Object)currentNumTasks, (Object)taskProps.size());
                changed = true;
            } else {
                int index = 0;
                for (Map<String, String> taskConfig : taskProps) {
                    if (!taskConfig.equals(this.configState.taskConfig(new ConnectorTaskId(connName, index)))) {
                        log.debug("Change in task configurations, writing updated task configurations");
                        changed = true;
                        break;
                    }
                    ++index;
                }
            }
            if (changed) {
                if (this.isLeader()) {
                    this.configStorage.putTaskConfigs(DistributedHerder.taskConfigListAsMap(connName, taskProps));
                    cb.onCompletion(null, null);
                } else {
                    this.forwardRequestExecutor.submit(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                String reconfigUrl = RestServer.urlJoin(DistributedHerder.this.leaderUrl(), "/connectors/" + connName + "/tasks");
                                RestServer.httpRequest(reconfigUrl, "POST", taskProps, null);
                                cb.onCompletion(null, null);
                            }
                            catch (ConnectException e) {
                                log.error("Request to leader to reconfigure connector tasks failed", (Throwable)e);
                                cb.onCompletion(e, null);
                            }
                        }
                    });
                }
            }
        }
        catch (Throwable t) {
            cb.onCompletion(t, null);
        }
    }

    private boolean checkConfigSynced(Callback<?> callback) {
        if (this.assignment == null || this.configState.offset() != this.assignment.offset()) {
            if (!this.isLeader()) {
                callback.onCompletion((Throwable)((Object)new NotLeaderException("Cannot get config data because config is not in sync and this is not the leader", this.leaderUrl())), null);
            } else {
                callback.onCompletion(new ConnectException("Cannot get config data because this is the leader node, but it does not have the most up to date configs"), null);
            }
            return false;
        }
        return true;
    }

    private void addRequest(Callable<Void> action, Callback<Void> callback) {
        this.addRequest(0L, action, callback);
    }

    private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
        HerderRequest req = new HerderRequest(this.time.milliseconds() + delayMs, action, callback);
        this.requests.add(req);
        if (this.requests.peek() == req) {
            this.member.wakeup();
        }
    }

    private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) {
        return new Callback<Void>(){

            @Override
            public void onCompletion(Throwable error, Void result) {
                if (error != null) {
                    callback.onCompletion(error, null);
                }
            }
        };
    }

    private Callback<String> connectorConfigCallback() {
        return new Callback<String>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onCompletion(Throwable error, String connector) {
                log.info("Connector {} config updated", (Object)connector);
                DistributedHerder distributedHerder = DistributedHerder.this;
                synchronized (distributedHerder) {
                    DistributedHerder.this.connectorConfigUpdates.add(connector);
                }
                DistributedHerder.this.member.wakeup();
            }
        };
    }

    private Callback<List<ConnectorTaskId>> taskConfigCallback() {
        return new Callback<List<ConnectorTaskId>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onCompletion(Throwable error, List<ConnectorTaskId> tasks) {
                log.info("Tasks {} configs updated", tasks);
                DistributedHerder distributedHerder = DistributedHerder.this;
                synchronized (distributedHerder) {
                    DistributedHerder.this.needsReconfigRebalance = true;
                }
                DistributedHerder.this.member.wakeup();
            }
        };
    }

    private WorkerRebalanceListener rebalanceListener() {
        return new WorkerRebalanceListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onAssigned(ConnectProtocol.Assignment assignment) {
                log.info("Joined group and got assignment: {}", (Object)assignment);
                DistributedHerder distributedHerder = DistributedHerder.this;
                synchronized (distributedHerder) {
                    DistributedHerder.this.assignment = assignment;
                    DistributedHerder.this.rebalanceResolved = false;
                }
                DistributedHerder.this.member.wakeup();
            }

            @Override
            public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
                log.info("Rebalance started");
                if (DistributedHerder.this.rebalanceResolved) {
                    for (String connectorName : connectors) {
                        DistributedHerder.this.worker.stopConnector(connectorName);
                    }
                    for (ConnectorTaskId taskId : tasks) {
                        DistributedHerder.this.worker.stopTask(taskId);
                    }
                    log.info("Finished stopping tasks in preparation for rebalance");
                } else {
                    log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks");
                }
            }
        };
    }

    private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connName, List<Map<String, String>> configs) {
        int index = 0;
        HashMap<ConnectorTaskId, Map<String, String>> result = new HashMap<ConnectorTaskId, Map<String, String>>();
        for (Map<String, String> taskConfigMap : configs) {
            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
            result.put(taskId, taskConfigMap);
            ++index;
        }
        return result;
    }

    private class HerderRequest
    implements Comparable<HerderRequest> {
        private final long at;
        private final Callable<Void> action;
        private final Callback<Void> callback;

        public HerderRequest(long at, Callable<Void> action, Callback<Void> callback) {
            this.at = at;
            this.action = action;
            this.callback = callback;
        }

        public Callable<Void> action() {
            return this.action;
        }

        public Callback<Void> callback() {
            return this.callback;
        }

        @Override
        public int compareTo(HerderRequest o) {
            return Long.compare(this.at, o.at);
        }
    }
}

