/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.ws.rs.core.Response;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.HerderRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.WorkerGroupMember;
import org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.SinkUtils;
import org.slf4j.Logger;

public class DistributedHerder
extends AbstractHerder
implements Runnable {
    private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private final Logger log;
    private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10L);
    private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1L);
    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250L;
    private static final int START_STOP_THREAD_POOL_SIZE = 8;
    private static final short BACKOFF_RETRIES = 5;
    private final AtomicLong requestSeqNum = new AtomicLong();
    private final Time time;
    private final HerderMetrics herderMetrics;
    private final String workerGroupId;
    private final int workerSyncTimeoutMs;
    private final long workerTasksShutdownTimeoutMs;
    private final int workerUnsyncBackoffMs;
    private final int keyRotationIntervalMs;
    private final String requestSignatureAlgorithm;
    private final List<String> keySignatureVerificationAlgorithms;
    private final KeyGenerator keyGenerator;
    private final ExecutorService herderExecutor;
    private final ExecutorService forwardRequestExecutor;
    private final ExecutorService startAndStopExecutor;
    private final WorkerGroupMember member;
    private final AtomicBoolean stopping;
    private final boolean isTopicTrackingEnabled;
    private boolean rebalanceResolved;
    private ExtendedAssignment runningAssignment = ExtendedAssignment.empty();
    private Set<ConnectorTaskId> tasksToRestart = new HashSet<ConnectorTaskId>();
    private ExtendedAssignment assignment;
    private boolean canReadConfigs;
    protected ClusterConfigState configState;
    final NavigableSet<DistributedHerderRequest> requests = new ConcurrentSkipListSet<DistributedHerderRequest>();
    private Set<String> connectorConfigUpdates = new HashSet<String>();
    private Set<ConnectorTaskId> taskConfigUpdates = new HashSet<ConnectorTaskId>();
    private Set<String> connectorTargetStateChanges = new HashSet<String>();
    private boolean needsReconfigRebalance;
    private volatile int generation;
    private volatile long scheduledRebalance;
    private volatile SecretKey sessionKey;
    private volatile long keyExpiration;
    private short currentProtocolVersion;
    private short backoffRetries;
    private final DistributedConfig config;

    public DistributedHerder(DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time, connectorClientConfigOverridePolicy);
        configBackingStore.setUpdateListener(new ConfigUpdateListener());
    }

    DistributedHerder(DistributedConfig config, Worker worker, String workerId, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, WorkerGroupMember member, String restUrl, ConnectMetrics metrics, Time time, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
        this.time = time;
        this.herderMetrics = new HerderMetrics(metrics);
        this.workerGroupId = config.getString("group.id");
        this.workerSyncTimeoutMs = config.getInt("worker.sync.timeout.ms");
        this.workerTasksShutdownTimeoutMs = config.getLong("task.shutdown.graceful.timeout.ms");
        this.workerUnsyncBackoffMs = config.getInt("worker.unsync.backoff.ms");
        this.requestSignatureAlgorithm = config.getString("inter.worker.signature.algorithm");
        this.keyRotationIntervalMs = config.getInt("inter.worker.key.ttl.ms");
        this.keySignatureVerificationAlgorithms = config.getList("inter.worker.verification.algorithms");
        this.keyGenerator = config.getInternalRequestKeyGenerator();
        this.isTopicTrackingEnabled = config.getBoolean("topic.tracking.enable");
        String clientIdConfig = config.getString("client.id");
        String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
        LogContext logContext = new LogContext("[Worker clientId=" + clientId + ", groupId=" + this.workerGroupId + "] ");
        this.log = logContext.logger(DistributedHerder.class);
        this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(time), time, clientId, logContext);
        this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(1), ThreadUtils.createThreadFactory((String)(this.getClass().getSimpleName() + "-" + clientId + "-%d"), (boolean)false));
        this.forwardRequestExecutor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory((String)("ForwardRequestExecutor-" + clientId + "-%d"), (boolean)false));
        this.startAndStopExecutor = Executors.newFixedThreadPool(8, ThreadUtils.createThreadFactory((String)("StartAndStopExecutor-" + clientId + "-%d"), (boolean)false));
        this.config = config;
        this.stopping = new AtomicBoolean(false);
        this.configState = ClusterConfigState.EMPTY;
        this.rebalanceResolved = true;
        this.needsReconfigRebalance = false;
        this.canReadConfigs = true;
        this.scheduledRebalance = Long.MAX_VALUE;
        this.keyExpiration = Long.MAX_VALUE;
        this.sessionKey = null;
        this.backoffRetries = (short)5;
        this.currentProtocolVersion = ConnectProtocolCompatibility.compatibility(config.getString("connect.protocol")).protocolVersion();
        if (!DistributedHerder.internalRequestValidationEnabled(this.currentProtocolVersion)) {
            this.log.warn("Internal request verification will be disabled for this cluster as this worker's {} configuration has been set to '{}'. If this is not intentional, either remove the '{}' configuration from the worker config file or change its value to '{}'. If this configuration is left as-is, the cluster will be insecure; for more information, see KIP-507: https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints", new Object[]{"connect.protocol", config.getString("connect.protocol"), "connect.protocol", ConnectProtocolCompatibility.SESSIONED.name()});
        }
    }

    @Override
    public void start() {
        this.herderExecutor.submit(this);
    }

    @Override
    public void run() {
        try {
            this.log.info("Herder starting");
            this.startServices();
            this.log.info("Herder started");
            this.running = true;
            while (!this.stopping.get()) {
                this.tick();
            }
            this.halt();
            this.log.info("Herder stopped");
            this.herderMetrics.close();
        }
        catch (Throwable t) {
            this.log.error("Uncaught exception in herder work thread, exiting: ", t);
            Exit.exit((int)1);
        }
        finally {
            this.running = false;
        }
    }

    public void tick() {
        DistributedHerderRequest next;
        try {
            if (!this.canReadConfigs) {
                if (this.readConfigToEnd(this.workerSyncTimeoutMs)) {
                    this.canReadConfigs = true;
                } else {
                    return;
                }
            }
            this.log.debug("Ensuring group membership is still active");
            this.member.ensureActive();
            if (!this.handleRebalanceCompleted()) {
                return;
            }
        }
        catch (WakeupException e) {
            this.log.trace("Woken up while ensure group membership is still active");
            return;
        }
        long now = this.time.milliseconds();
        if (this.checkForKeyRotation(now)) {
            this.log.debug("Distributing new session key");
            this.keyExpiration = Long.MAX_VALUE;
            this.configBackingStore.putSessionKey(new SessionKey(this.keyGenerator.generateKey(), now));
        }
        long nextRequestTimeoutMs = Long.MAX_VALUE;
        while ((next = this.peekWithoutException()) != null) {
            if (now < next.at) {
                nextRequestTimeoutMs = next.at - now;
                break;
            }
            this.requests.pollFirst();
            try {
                next.action().call();
                next.callback().onCompletion(null, null);
            }
            catch (Throwable t) {
                next.callback().onCompletion(t, null);
            }
        }
        if (this.scheduledRebalance < Long.MAX_VALUE) {
            nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(this.scheduledRebalance - now, 0L));
            this.rebalanceResolved = false;
            this.log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ", new Object[]{this.scheduledRebalance, now, nextRequestTimeoutMs});
        }
        if (this.internalRequestValidationEnabled() && this.keyExpiration < Long.MAX_VALUE) {
            nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(this.keyExpiration - now, 0L));
            this.log.debug("Scheduled next key rotation at: {} (now: {} nextRequestTimeoutMs: {}) ", new Object[]{this.keyExpiration, now, nextRequestTimeoutMs});
        }
        AtomicReference<Set<String>> connectorConfigUpdatesCopy = new AtomicReference<Set<String>>();
        AtomicReference<Set<String>> connectorTargetStateChangesCopy = new AtomicReference<Set<String>>();
        AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy = new AtomicReference<Set<ConnectorTaskId>>();
        if (this.member.currentProtocolVersion() == 0) {
            boolean shouldReturn = this.updateConfigsWithEager(connectorConfigUpdatesCopy, connectorTargetStateChangesCopy);
            if (shouldReturn) {
                return;
            }
            if (connectorConfigUpdatesCopy.get() != null) {
                this.processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
            }
            if (connectorTargetStateChangesCopy.get() != null) {
                this.processTargetStateChanges(connectorTargetStateChangesCopy.get());
            }
        } else {
            boolean shouldReturn = this.updateConfigsWithIncrementalCooperative(connectorConfigUpdatesCopy, connectorTargetStateChangesCopy, taskConfigUpdatesCopy);
            if (connectorConfigUpdatesCopy.get() != null) {
                this.processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
            }
            if (connectorTargetStateChangesCopy.get() != null) {
                this.processTargetStateChanges(connectorTargetStateChangesCopy.get());
            }
            if (taskConfigUpdatesCopy.get() != null) {
                this.processTaskConfigUpdatesWithIncrementalCooperative(taskConfigUpdatesCopy.get());
            }
            if (shouldReturn) {
                return;
            }
        }
        try {
            this.log.trace("Polling for group activity; will wait for {}ms or until poll is interrupted by either config backing store updates or a new external request", (Object)nextRequestTimeoutMs);
            this.member.poll(nextRequestTimeoutMs);
            this.handleRebalanceCompleted();
        }
        catch (WakeupException e) {
            this.log.trace("Woken up while polling for group activity");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkForKeyRotation(long now) {
        long expiration;
        SecretKey key;
        DistributedHerder distributedHerder = this;
        synchronized (distributedHerder) {
            key = this.sessionKey;
            expiration = this.keyExpiration;
        }
        if (this.internalRequestValidationEnabled()) {
            if (this.isLeader()) {
                if (key == null) {
                    this.log.debug("Internal request signing is enabled but no session key has been distributed yet. Distributing new key now.");
                    return true;
                }
                if (expiration <= now) {
                    this.log.debug("Existing key has expired. Distributing new key now.");
                    return true;
                }
                if (!key.getAlgorithm().equals(this.keyGenerator.getAlgorithm()) || key.getEncoded().length != this.keyGenerator.generateKey().getEncoded().length) {
                    this.log.debug("Previously-distributed key uses different algorithm/key size than required by current worker configuration. Distributing new key now.");
                    return true;
                }
            } else if (key == null && this.configState.sessionKey() != null) {
                this.sessionKey = this.configState.sessionKey().key();
            }
        }
        return false;
    }

    private synchronized boolean updateConfigsWithEager(AtomicReference<Set<String>> connectorConfigUpdatesCopy, AtomicReference<Set<String>> connectorTargetStateChangesCopy) {
        if (this.needsReconfigRebalance || !this.connectorConfigUpdates.isEmpty() || !this.connectorTargetStateChanges.isEmpty()) {
            this.log.trace("Handling config updates with eager rebalancing");
            this.configState = this.configBackingStore.snapshot();
            if (this.needsReconfigRebalance) {
                this.log.debug("Requesting rebalance due to reconfiguration of tasks (needsReconfigRebalance: {})", (Object)this.needsReconfigRebalance);
                this.member.requestRejoin();
                this.needsReconfigRebalance = false;
                this.connectorConfigUpdates.clear();
                this.connectorTargetStateChanges.clear();
                return true;
            }
            if (!this.connectorConfigUpdates.isEmpty()) {
                connectorConfigUpdatesCopy.set(this.connectorConfigUpdates);
                this.connectorConfigUpdates = new HashSet<String>();
            }
            if (!this.connectorTargetStateChanges.isEmpty()) {
                connectorTargetStateChangesCopy.set(this.connectorTargetStateChanges);
                this.connectorTargetStateChanges = new HashSet<String>();
            }
        } else {
            this.log.trace("Skipping config updates with eager rebalancing since no config rebalance is required and there are no connector config, task config, or target state changes pending");
        }
        return false;
    }

    private synchronized boolean updateConfigsWithIncrementalCooperative(AtomicReference<Set<String>> connectorConfigUpdatesCopy, AtomicReference<Set<String>> connectorTargetStateChangesCopy, AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy) {
        boolean retValue = false;
        if (this.needsReconfigRebalance || !this.connectorConfigUpdates.isEmpty() || !this.connectorTargetStateChanges.isEmpty() || !this.taskConfigUpdates.isEmpty()) {
            this.log.trace("Handling config updates with incremental cooperative rebalancing");
            this.configState = this.configBackingStore.snapshot();
            if (this.needsReconfigRebalance) {
                this.log.debug("Requesting rebalance due to reconfiguration of tasks (needsReconfigRebalance: {})", (Object)this.needsReconfigRebalance);
                this.member.requestRejoin();
                this.needsReconfigRebalance = false;
                retValue = true;
            }
            if (!this.connectorConfigUpdates.isEmpty()) {
                connectorConfigUpdatesCopy.set(this.connectorConfigUpdates);
                this.connectorConfigUpdates = new HashSet<String>();
            }
            if (!this.connectorTargetStateChanges.isEmpty()) {
                connectorTargetStateChangesCopy.set(this.connectorTargetStateChanges);
                this.connectorTargetStateChanges = new HashSet<String>();
            }
            if (!this.taskConfigUpdates.isEmpty()) {
                taskConfigUpdatesCopy.set(this.taskConfigUpdates);
                this.taskConfigUpdates = new HashSet<ConnectorTaskId>();
            }
        } else {
            this.log.trace("Skipping config updates with incremental cooperative rebalancing since no config rebalance is required and there are no connector config, task config, or target state changes pending");
        }
        return retValue;
    }

    private void processConnectorConfigUpdates(Set<String> connectorConfigUpdates) {
        HashSet<String> localConnectors = this.assignment == null ? Collections.emptySet() : new HashSet<String>(this.assignment.connectors());
        this.log.trace("Processing connector config updates; currently-owned connectors are {}, and to-be-updated connectors are {}", localConnectors, connectorConfigUpdates);
        for (String connectorName : connectorConfigUpdates) {
            if (!localConnectors.contains(connectorName)) {
                this.log.trace("Skipping config update for connector {} as it is not owned by this worker", (Object)connectorName);
                continue;
            }
            boolean remains = this.configState.contains(connectorName);
            this.log.info("Handling connector-only config update by {} connector {}", (Object)(remains ? "restarting" : "stopping"), (Object)connectorName);
            this.worker.stopAndAwaitConnector(connectorName);
            if (!remains) continue;
            this.startConnector(connectorName, (error, result) -> {
                if (error != null) {
                    this.log.error("Failed to start connector '" + connectorName + "'", error);
                }
            });
        }
    }

    private void processTargetStateChanges(Set<String> connectorTargetStateChanges) {
        this.log.trace("Processing target state updates; currently-known connectors are {}, and to-be-updated connectors are {}", this.configState.connectors(), connectorTargetStateChanges);
        for (String connector : connectorTargetStateChanges) {
            TargetState targetState = this.configState.targetState(connector);
            if (!this.configState.connectors().contains(connector)) {
                this.log.debug("Received target state change for unknown connector: {}", (Object)connector);
                continue;
            }
            this.worker.setTargetState(connector, targetState, (error, newState) -> {
                if (error != null) {
                    this.log.error("Failed to transition connector to target state", error);
                    return;
                }
                if (newState == TargetState.STARTED) {
                    this.requestTaskReconfiguration(connector);
                }
            });
        }
    }

    private void processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId> taskConfigUpdates) {
        HashSet<ConnectorTaskId> localTasks = this.assignment == null ? Collections.emptySet() : new HashSet<ConnectorTaskId>(this.assignment.tasks());
        this.log.trace("Processing task config updates with incremental cooperative rebalance protocol; currently-owned tasks are {}, and to-be-updated tasks are {}", localTasks, taskConfigUpdates);
        Set connectorsWhoseTasksToStop = taskConfigUpdates.stream().map(ConnectorTaskId::connector).collect(Collectors.toSet());
        List<ConnectorTaskId> tasksToStop = localTasks.stream().filter(taskId -> connectorsWhoseTasksToStop.contains(taskId.connector())).collect(Collectors.toList());
        this.log.info("Handling task config update by restarting tasks {}", tasksToStop);
        this.worker.stopAndAwaitTasks(tasksToStop);
        this.tasksToRestart.addAll(tasksToStop);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void halt() {
        DistributedHerder distributedHerder = this;
        synchronized (distributedHerder) {
            this.log.info("Stopping connectors and tasks that are still assigned to this worker.");
            ArrayList<Callable<Void>> callables = new ArrayList<Callable<Void>>();
            for (String connectorName : new ArrayList<String>(this.worker.connectorNames())) {
                callables.add(this.getConnectorStoppingCallable(connectorName));
            }
            for (ConnectorTaskId taskId : new ArrayList<ConnectorTaskId>(this.worker.taskIds())) {
                callables.add(this.getTaskStoppingCallable(taskId));
            }
            this.startAndStop(callables);
            this.member.stop();
            DistributedHerderRequest request = this.requests.pollFirst();
            while (request != null) {
                request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
                request = this.requests.pollFirst();
            }
            this.stopServices();
        }
    }

    @Override
    public void stop() {
        this.log.info("Herder stopping");
        this.stopping.set(true);
        this.member.wakeup();
        this.herderExecutor.shutdown();
        try {
            if (!this.herderExecutor.awaitTermination(this.workerTasksShutdownTimeoutMs, TimeUnit.MILLISECONDS)) {
                this.herderExecutor.shutdownNow();
            }
            this.forwardRequestExecutor.shutdown();
            this.startAndStopExecutor.shutdown();
            if (!this.forwardRequestExecutor.awaitTermination(FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                this.forwardRequestExecutor.shutdownNow();
            }
            if (!this.startAndStopExecutor.awaitTermination(START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                this.startAndStopExecutor.shutdownNow();
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.log.info("Herder stopped");
        this.running = false;
    }

    @Override
    public void connectors(final Callback<Collection<String>> callback) {
        this.log.trace("Submitting connector listing request");
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                    return null;
                }
                callback.onCompletion(null, DistributedHerder.this.configState.connectors());
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) {
        this.log.trace("Submitting connector info request {}", (Object)connName);
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                    return null;
                }
                if (!DistributedHerder.this.configState.contains(connName)) {
                    callback.onCompletion((Throwable)new NotFoundException("Connector " + connName + " not found"), null);
                } else {
                    callback.onCompletion(null, DistributedHerder.this.connectorInfo(connName));
                }
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    protected Map<String, String> config(String connName) {
        return this.configState.connectorConfig(connName);
    }

    @Override
    public void connectorConfig(String connName, Callback<Map<String, String>> callback) {
        this.log.trace("Submitting connector config read request {}", (Object)connName);
        this.connectorInfo(connName, (error, result) -> {
            if (error != null) {
                callback.onCompletion(error, null);
            } else {
                callback.onCompletion(null, result.config());
            }
        });
    }

    @Override
    public void deleteConnectorConfig(final String connName, final Callback<Herder.Created<ConnectorInfo>> callback) {
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                DistributedHerder.this.log.trace("Handling connector config request {}", (Object)connName);
                if (!DistributedHerder.this.isLeader()) {
                    callback.onCompletion((Throwable)((Object)new NotLeaderException("Only the leader can delete connector configs.", DistributedHerder.this.leaderUrl())), null);
                    return null;
                }
                if (!DistributedHerder.this.configState.contains(connName)) {
                    callback.onCompletion((Throwable)new NotFoundException("Connector " + connName + " not found"), null);
                } else {
                    DistributedHerder.this.log.trace("Removing connector config {} {}", (Object)connName, DistributedHerder.this.configState.connectors());
                    DistributedHerder.this.configBackingStore.removeConnectorConfig(connName);
                    callback.onCompletion(null, new Herder.Created<Object>(false, null));
                }
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector, ConfigDef configDef, Map<String, String> config) {
        ConfigValue validatedName;
        String name;
        Map<String, ConfigValue> validatedConfig = super.validateBasicConnectorConfig(connector, configDef, config);
        if (connector instanceof SinkConnector && this.workerGroupId.equals(SinkUtils.consumerGroupId(name = (String)(validatedName = validatedConfig.get("name")).value()))) {
            validatedName.addErrorMessage("Consumer group for sink connector named " + name + " conflicts with Connect worker group " + this.workerGroupId);
        }
        return validatedConfig;
    }

    @Override
    public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace, final Callback<Herder.Created<ConnectorInfo>> callback) {
        this.log.trace("Submitting connector config write request {}", (Object)connName);
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                DistributedHerder.this.validateConnectorConfig(config, (error, configInfos) -> {
                    if (error != null) {
                        callback.onCompletion(error, null);
                        return;
                    }
                    DistributedHerder.this.addRequest(new Callable<Void>(){

                        @Override
                        public Void call() {
                            if (DistributedHerder.this.maybeAddConfigErrors(configInfos, callback)) {
                                return null;
                            }
                            DistributedHerder.this.log.trace("Handling connector config request {}", (Object)connName);
                            if (!DistributedHerder.this.isLeader()) {
                                callback.onCompletion((Throwable)((Object)new NotLeaderException("Only the leader can set connector configs.", DistributedHerder.this.leaderUrl())), null);
                                return null;
                            }
                            boolean exists = DistributedHerder.this.configState.contains(connName);
                            if (!allowReplace && exists) {
                                callback.onCompletion((Throwable)new AlreadyExistsException("Connector " + connName + " already exists"), null);
                                return null;
                            }
                            DistributedHerder.this.log.trace("Submitting connector config {} {} {}", new Object[]{connName, allowReplace, DistributedHerder.this.configState.connectors()});
                            DistributedHerder.this.configBackingStore.putConnectorConfig(connName, config);
                            ConnectorInfo info = new ConnectorInfo(connName, config, DistributedHerder.this.configState.tasks(connName), DistributedHerder.this.connectorTypeForClass((String)config.get("connector.class")));
                            callback.onCompletion(null, new Herder.Created<ConnectorInfo>(!exists, info));
                            return null;
                        }
                    }, DistributedHerder.forwardErrorCallback(callback));
                });
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public void requestTaskReconfiguration(final String connName) {
        this.log.trace("Submitting connector task reconfiguration request {}", (Object)connName);
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                DistributedHerder.this.reconfigureConnectorTasksWithRetry(DistributedHerder.this.time.milliseconds(), connName);
                return null;
            }
        }, (error, result) -> {
            if (error != null) {
                this.log.error("Unexpected error during task reconfiguration: ", error);
                this.log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", (Object)connName);
            }
        });
    }

    @Override
    public void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) {
        this.log.trace("Submitting get task configuration request {}", (Object)connName);
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                    return null;
                }
                if (!DistributedHerder.this.configState.contains(connName)) {
                    callback.onCompletion((Throwable)new NotFoundException("Connector " + connName + " not found"), null);
                } else {
                    ArrayList<TaskInfo> result = new ArrayList<TaskInfo>();
                    for (int i = 0; i < DistributedHerder.this.configState.taskCount(connName); ++i) {
                        ConnectorTaskId id = new ConnectorTaskId(connName, i);
                        result.add(new TaskInfo(id, DistributedHerder.this.configState.rawTaskConfig(id)));
                    }
                    callback.onCompletion(null, result);
                }
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback, InternalRequestSignature requestSignature) {
        this.log.trace("Submitting put task configuration request {}", (Object)connName);
        if (this.internalRequestValidationEnabled()) {
            ConnectRestException requestValidationError = null;
            if (requestSignature == null) {
                requestValidationError = new BadRequestException("Internal request missing required signature");
            } else if (!this.keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) {
                requestValidationError = new BadRequestException(String.format("This worker does not support the '%s' key signing algorithm used by other workers. This worker is currently configured to use: %s. Check that all workers' configuration files permit the same set of signature algorithms, and correct any misconfigured worker and restart it.", requestSignature.keyAlgorithm(), this.keySignatureVerificationAlgorithms));
            } else if (!requestSignature.isValid(this.sessionKey)) {
                requestValidationError = new ConnectRestException(Response.Status.FORBIDDEN, "Internal request contained invalid signature.");
            }
            if (requestValidationError != null) {
                callback.onCompletion((Throwable)((Object)requestValidationError), null);
                return;
            }
        }
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (!DistributedHerder.this.isLeader()) {
                    callback.onCompletion((Throwable)((Object)new NotLeaderException("Only the leader may write task configurations.", DistributedHerder.this.leaderUrl())), null);
                } else if (!DistributedHerder.this.configState.contains(connName)) {
                    callback.onCompletion((Throwable)new NotFoundException("Connector " + connName + " not found"), null);
                } else {
                    DistributedHerder.this.configBackingStore.putTaskConfigs(connName, configs);
                    callback.onCompletion(null, null);
                }
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public void restartConnector(String connName, Callback<Void> callback) {
        this.restartConnector(0L, connName, callback);
    }

    @Override
    public HerderRequest restartConnector(long delayMs, final String connName, final Callback<Void> callback) {
        return this.addRequest(delayMs, new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                    return null;
                }
                if (!DistributedHerder.this.configState.connectors().contains(connName)) {
                    callback.onCompletion((Throwable)new NotFoundException("Unknown connector: " + connName), null);
                    return null;
                }
                if (DistributedHerder.this.assignment.connectors().contains(connName)) {
                    try {
                        DistributedHerder.this.worker.stopAndAwaitConnector(connName);
                        DistributedHerder.this.startConnector(connName, callback);
                    }
                    catch (Throwable t) {
                        callback.onCompletion(t, null);
                    }
                } else if (DistributedHerder.this.isLeader()) {
                    callback.onCompletion((Throwable)((Object)new NotAssignedException("Cannot restart connector since it is not assigned to this member", DistributedHerder.this.member.ownerUrl(connName))), null);
                } else {
                    callback.onCompletion((Throwable)((Object)new NotLeaderException("Cannot restart connector since it is not assigned to this member", DistributedHerder.this.leaderUrl())), null);
                }
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public void restartTask(final ConnectorTaskId id, final Callback<Void> callback) {
        this.addRequest(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                block10: {
                    if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                        return null;
                    }
                    if (!DistributedHerder.this.configState.connectors().contains(id.connector())) {
                        callback.onCompletion((Throwable)new NotFoundException("Unknown connector: " + id.connector()), null);
                        return null;
                    }
                    if (DistributedHerder.this.configState.taskConfig(id) == null) {
                        callback.onCompletion((Throwable)new NotFoundException("Unknown task: " + id), null);
                        return null;
                    }
                    if (DistributedHerder.this.assignment.tasks().contains(id)) {
                        try {
                            DistributedHerder.this.worker.stopAndAwaitTask(id);
                            if (DistributedHerder.this.startTask(id)) {
                                callback.onCompletion(null, null);
                                break block10;
                            }
                            callback.onCompletion(new ConnectException("Failed to start task: " + id), null);
                        }
                        catch (Throwable t) {
                            callback.onCompletion(t, null);
                        }
                    } else if (DistributedHerder.this.isLeader()) {
                        callback.onCompletion((Throwable)((Object)new NotAssignedException("Cannot restart task since it is not assigned to this member", DistributedHerder.this.member.ownerUrl(id))), null);
                    } else {
                        callback.onCompletion((Throwable)((Object)new NotLeaderException("Cannot restart task since it is not assigned to this member", DistributedHerder.this.leaderUrl())), null);
                    }
                }
                return null;
            }
        }, DistributedHerder.forwardErrorCallback(callback));
    }

    @Override
    public int generation() {
        return this.generation;
    }

    private boolean isLeader() {
        return this.assignment != null && this.member.memberId().equals(this.assignment.leader());
    }

    private String leaderUrl() {
        if (this.assignment == null) {
            return null;
        }
        return this.assignment.leaderUrl();
    }

    private boolean handleRebalanceCompleted() {
        if (this.rebalanceResolved) {
            this.log.trace("Returning early because rebalance is marked as resolved (rebalanceResolved: true)");
            return true;
        }
        this.log.debug("Handling completed but unresolved rebalance");
        boolean needsReadToEnd = false;
        boolean needsRejoin = false;
        if (this.assignment.failed()) {
            needsRejoin = true;
            if (this.isLeader()) {
                this.log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying.");
                needsReadToEnd = true;
            } else if (this.configState.offset() < this.assignment.offset()) {
                this.log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying.");
                needsReadToEnd = true;
            } else {
                this.log.warn("Join group completed, but assignment failed. We were up to date, so just retrying.");
            }
        } else if (this.configState.offset() < this.assignment.offset()) {
            this.log.warn("Catching up to assignment's config offset.");
            needsReadToEnd = true;
        }
        long now = this.time.milliseconds();
        if (this.scheduledRebalance <= now) {
            this.log.debug("Requesting rebalance because scheduled rebalance timeout has been reached (now: {} scheduledRebalance: {}", (Object)this.scheduledRebalance, (Object)now);
            needsRejoin = true;
            this.scheduledRebalance = Long.MAX_VALUE;
        }
        if (needsReadToEnd) {
            if (this.readConfigToEnd(this.workerSyncTimeoutMs)) {
                this.canReadConfigs = true;
            } else {
                this.canReadConfigs = false;
                needsRejoin = true;
            }
        }
        if (needsRejoin) {
            this.member.requestRejoin();
            return false;
        }
        if (this.configState.offset() != this.assignment.offset()) {
            this.log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", (Object)this.configState.offset(), (Object)this.assignment.offset());
            this.member.requestRejoin();
            return false;
        }
        this.startWork();
        this.herderMetrics.rebalanceSucceeded(this.time.milliseconds());
        this.rebalanceResolved = true;
        if (!this.assignment.revokedConnectors().isEmpty() || !this.assignment.revokedTasks().isEmpty()) {
            this.assignment.revokedConnectors().clear();
            this.assignment.revokedTasks().clear();
            this.member.requestRejoin();
            return false;
        }
        return true;
    }

    private boolean readConfigToEnd(long timeoutMs) {
        this.log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", (Object)this.configState.offset(), (Object)this.assignment.offset());
        try {
            this.configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
            this.configState = this.configBackingStore.snapshot();
            this.log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", (Object)this.configState.offset());
            this.backoffRetries = (short)5;
            return true;
        }
        catch (TimeoutException e) {
            this.log.warn("Didn't reach end of config log quickly enough", (Throwable)e);
            this.member.maybeLeaveGroup("taking too long to read the log");
            this.backoff(this.workerUnsyncBackoffMs);
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void backoff(long ms) {
        ExtendedAssignment runningAssignmentSnapshot;
        if (ConnectProtocolCompatibility.fromProtocolVersion(this.currentProtocolVersion) == ConnectProtocolCompatibility.EAGER) {
            this.time.sleep(ms);
            return;
        }
        if (this.backoffRetries > 0) {
            int rebalanceDelayFraction = this.config.getInt("scheduled.rebalance.max.delay.ms") / 10 / this.backoffRetries;
            this.time.sleep((long)rebalanceDelayFraction);
            this.backoffRetries = (short)(this.backoffRetries - 1);
            return;
        }
        DistributedHerder distributedHerder = this;
        synchronized (distributedHerder) {
            runningAssignmentSnapshot = ExtendedAssignment.duplicate(this.runningAssignment);
        }
        this.log.info("Revoking current running assignment {} because after {} retries the worker has not caught up with the latest Connect cluster updates", (Object)runningAssignmentSnapshot, (Object)5);
        this.member.revokeAssignment(runningAssignmentSnapshot);
        this.backoffRetries = (short)5;
    }

    private void startAndStop(Collection<Callable<Void>> callables) {
        try {
            this.startAndStopExecutor.invokeAll(callables);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startWork() {
        ArrayList<Callable<Void>> callables = new ArrayList<Callable<Void>>();
        DistributedHerder distributedHerder = this;
        synchronized (distributedHerder) {
            this.log.info("Starting connectors and tasks using config offset {}", (Object)this.assignment.offset());
            this.log.debug("Received assignment: {}", (Object)this.assignment);
            this.log.debug("Currently running assignment: {}", (Object)this.runningAssignment);
            for (String connectorName : DistributedHerder.assignmentDifference(this.assignment.connectors(), this.runningAssignment.connectors())) {
                callables.add(this.getConnectorStartingCallable(connectorName));
            }
            this.log.debug("Tasks to restart from currently running assignment: {}", this.tasksToRestart);
            this.runningAssignment.tasks().removeAll(this.tasksToRestart);
            this.tasksToRestart.clear();
            for (ConnectorTaskId taskId : DistributedHerder.assignmentDifference(this.assignment.tasks(), this.runningAssignment.tasks())) {
                callables.add(this.getTaskStartingCallable(taskId));
            }
        }
        this.startAndStop(callables);
        distributedHerder = this;
        synchronized (distributedHerder) {
            this.runningAssignment = this.member.currentProtocolVersion() == 0 ? ExtendedAssignment.empty() : this.assignment;
        }
        this.log.info("Finished starting connectors and tasks");
    }

    private static <T> Collection<T> assignmentDifference(Collection<T> update, Collection<T> running) {
        if (running.isEmpty()) {
            return update;
        }
        HashSet<T> diff = new HashSet<T>(update);
        diff.removeAll(running);
        return diff;
    }

    private boolean startTask(ConnectorTaskId taskId) {
        this.log.info("Starting task {}", (Object)taskId);
        return this.worker.startTask(taskId, this.configState, this.configState.connectorConfig(taskId.connector()), this.configState.taskConfig(taskId), this, this.configState.targetState(taskId.connector()));
    }

    private Callable<Void> getTaskStartingCallable(final ConnectorTaskId taskId) {
        return new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    DistributedHerder.this.startTask(taskId);
                }
                catch (Throwable t) {
                    DistributedHerder.this.log.error("Couldn't instantiate task {} because it has an invalid task configuration. This task will not execute until reconfigured.", (Object)taskId, (Object)t);
                    DistributedHerder.this.onFailure(taskId, t);
                }
                return null;
            }
        };
    }

    private Callable<Void> getTaskStoppingCallable(final ConnectorTaskId taskId) {
        return new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                DistributedHerder.this.worker.stopAndAwaitTask(taskId);
                return null;
            }
        };
    }

    private void startConnector(final String connectorName, final Callback<Void> callback) {
        this.log.info("Starting connector {}", (Object)connectorName);
        Map<String, String> configProps = this.configState.connectorConfig(connectorName);
        HerderConnectorContext ctx = new HerderConnectorContext(this, connectorName);
        TargetState initialState = this.configState.targetState(connectorName);
        Callback<TargetState> onInitialStateChange = (error, newState) -> {
            if (error != null) {
                callback.onCompletion(new ConnectException("Failed to start connector: " + connectorName), null);
                return;
            }
            if (newState == TargetState.STARTED) {
                this.addRequest(new Callable<Void>(){

                    @Override
                    public Void call() {
                        DistributedHerder.this.reconfigureConnectorTasksWithRetry(DistributedHerder.this.time.milliseconds(), connectorName);
                        callback.onCompletion(null, null);
                        return null;
                    }
                }, DistributedHerder.forwardErrorCallback(callback));
            } else {
                callback.onCompletion(null, null);
            }
        };
        this.worker.startConnector(connectorName, configProps, ctx, this, initialState, onInitialStateChange);
    }

    private Callable<Void> getConnectorStartingCallable(final String connectorName) {
        return new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    DistributedHerder.this.startConnector(connectorName, (error, result) -> {
                        if (error != null) {
                            DistributedHerder.this.log.error("Failed to start connector '" + connectorName + "'", error);
                        }
                    });
                }
                catch (Throwable t) {
                    DistributedHerder.this.log.error("Unexpected error while trying to start connector " + connectorName, t);
                    DistributedHerder.this.onFailure(connectorName, t);
                }
                return null;
            }
        };
    }

    private Callable<Void> getConnectorStoppingCallable(final String connectorName) {
        return new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    DistributedHerder.this.worker.stopAndAwaitConnector(connectorName);
                }
                catch (Throwable t) {
                    DistributedHerder.this.log.error("Failed to shut down connector " + connectorName, t);
                }
                return null;
            }
        };
    }

    private void reconfigureConnectorTasksWithRetry(final long initialRequestTime, final String connName) {
        this.reconfigureConnector(connName, new Callback<Void>(){

            @Override
            public void onCompletion(Throwable error, Void result) {
                if (error != null) {
                    if (DistributedHerder.this.isPossibleExpiredKeyException(initialRequestTime, error)) {
                        DistributedHerder.this.log.debug("Failed to reconfigure connector's tasks, possibly due to expired session key. Retrying after backoff");
                    } else {
                        DistributedHerder.this.log.error("Failed to reconfigure connector's tasks, retrying after backoff:", error);
                    }
                    DistributedHerder.this.addRequest(250L, new Callable<Void>(){

                        @Override
                        public Void call() throws Exception {
                            DistributedHerder.this.reconfigureConnectorTasksWithRetry(initialRequestTime, connName);
                            return null;
                        }
                    }, new Callback<Void>(){

                        @Override
                        public void onCompletion(Throwable error, Void result) {
                            if (error != null) {
                                DistributedHerder.this.log.error("Unexpected error during connector task reconfiguration: ", error);
                                DistributedHerder.this.log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", (Object)connName);
                            }
                        }
                    });
                }
            }
        });
    }

    boolean isPossibleExpiredKeyException(long initialRequestTime, Throwable error) {
        if (error instanceof ConnectRestException) {
            ConnectRestException connectError = (ConnectRestException)((Object)error);
            return connectError.statusCode() == Response.Status.FORBIDDEN.getStatusCode() && initialRequestTime + TimeUnit.MINUTES.toMillis(1L) >= this.time.milliseconds();
        }
        return false;
    }

    private void reconfigureConnector(String connName, Callback<Void> cb) {
        try {
            if (!this.worker.isRunning(connName)) {
                this.log.info("Skipping reconfiguration of connector {} since it is not running", (Object)connName);
                return;
            }
            Map<String, String> configs = this.configState.connectorConfig(connName);
            ConnectorConfig connConfig = this.worker.isSinkConnector(connName) ? new SinkConnectorConfig(this.plugins(), configs) : new SourceConnectorConfig(this.plugins(), configs, this.worker.isTopicCreationEnabled());
            List<Map<String, String>> taskProps = this.worker.connectorTaskConfigs(connName, connConfig);
            boolean changed = false;
            int currentNumTasks = this.configState.taskCount(connName);
            if (taskProps.size() != currentNumTasks) {
                this.log.debug("Change in connector task count from {} to {}, writing updated task configurations", (Object)currentNumTasks, (Object)taskProps.size());
                changed = true;
            } else {
                int index = 0;
                for (Map<String, String> taskConfig : taskProps) {
                    if (!taskConfig.equals(this.configState.taskConfig(new ConnectorTaskId(connName, index)))) {
                        this.log.debug("Change in task configurations, writing updated task configurations");
                        changed = true;
                        break;
                    }
                    ++index;
                }
            }
            if (changed) {
                List<Map<String, String>> rawTaskProps = DistributedHerder.reverseTransform(connName, this.configState, taskProps);
                if (this.isLeader()) {
                    this.configBackingStore.putTaskConfigs(connName, rawTaskProps);
                    cb.onCompletion(null, null);
                } else {
                    this.forwardRequestExecutor.submit(() -> {
                        try {
                            String leaderUrl = this.leaderUrl();
                            if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
                                cb.onCompletion(new ConnectException("Request to leader to reconfigure connector tasks failed because the URL of the leader's REST interface is empty!"), null);
                                return;
                            }
                            String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks");
                            this.log.trace("Forwarding task configurations for connector {} to leader", (Object)connName);
                            RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, this.config, this.sessionKey, this.requestSignatureAlgorithm);
                            cb.onCompletion(null, null);
                        }
                        catch (ConnectException e) {
                            this.log.error("Request to leader to reconfigure connector tasks failed", (Throwable)e);
                            cb.onCompletion(e, null);
                        }
                    });
                }
            }
        }
        catch (Throwable t) {
            cb.onCompletion(t, null);
        }
    }

    private boolean checkRebalanceNeeded(Callback<?> callback) {
        if (this.needsReconfigRebalance) {
            callback.onCompletion((Throwable)((Object)new RebalanceNeededException("Request cannot be completed because a rebalance is expected")), null);
            return true;
        }
        return false;
    }

    DistributedHerderRequest addRequest(Callable<Void> action, Callback<Void> callback) {
        return this.addRequest(0L, action, callback);
    }

    DistributedHerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
        DistributedHerderRequest req = new DistributedHerderRequest(this.time.milliseconds() + delayMs, this.requestSeqNum.incrementAndGet(), action, callback);
        this.requests.add(req);
        if (this.peekWithoutException() == req) {
            this.member.wakeup();
        }
        return req;
    }

    private boolean internalRequestValidationEnabled() {
        return DistributedHerder.internalRequestValidationEnabled(this.member.currentProtocolVersion());
    }

    private static boolean internalRequestValidationEnabled(short protocolVersion) {
        return protocolVersion >= 2;
    }

    private DistributedHerderRequest peekWithoutException() {
        try {
            return this.requests.isEmpty() ? null : (DistributedHerderRequest)this.requests.first();
        }
        catch (NoSuchElementException noSuchElementException) {
            return null;
        }
    }

    private static Callback<Void> forwardErrorCallback(Callback<?> callback) {
        return (error, result) -> {
            if (error != null) {
                callback.onCompletion(error, null);
            }
        };
    }

    private void updateDeletedConnectorStatus() {
        ClusterConfigState snapshot = this.configBackingStore.snapshot();
        Set<String> connectors = snapshot.connectors();
        for (String connector : this.statusBackingStore.connectors()) {
            if (connectors.contains(connector)) continue;
            this.log.debug("Cleaning status information for connector {}", (Object)connector);
            this.onDeletion(connector);
        }
    }

    private void updateDeletedTaskStatus() {
        ClusterConfigState snapshot = this.configBackingStore.snapshot();
        for (String connector : this.statusBackingStore.connectors()) {
            HashSet<ConnectorTaskId> remainingTasks = new HashSet<ConnectorTaskId>(snapshot.tasks(connector));
            this.statusBackingStore.getAll(connector).stream().map(AbstractStatus::id).filter(task -> !remainingTasks.contains(task)).forEach(this::onDeletion);
        }
    }

    protected HerderMetrics herderMetrics() {
        return this.herderMetrics;
    }

    class HerderMetrics {
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor rebalanceCompletedCounts;
        private final Sensor rebalanceTime;
        private volatile long lastRebalanceCompletedAtMillis = Long.MIN_VALUE;
        private volatile boolean rebalancing = false;
        private volatile long rebalanceStartedAtMillis = 0L;

        public HerderMetrics(ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.workerRebalanceGroupName(), new String[0]);
            this.metricGroup.addValueMetric(registry.connectProtocol, new ConnectMetrics.LiteralSupplier<String>(){

                @Override
                public String metricValue(long now) {
                    return ConnectProtocolCompatibility.fromProtocolVersion(DistributedHerder.this.member.currentProtocolVersion()).name();
                }
            });
            this.metricGroup.addValueMetric(registry.leaderName, new ConnectMetrics.LiteralSupplier<String>(){

                @Override
                public String metricValue(long now) {
                    return DistributedHerder.this.leaderUrl();
                }
            });
            this.metricGroup.addValueMetric(registry.epoch, new ConnectMetrics.LiteralSupplier<Double>(){

                @Override
                public Double metricValue(long now) {
                    return DistributedHerder.this.generation;
                }
            });
            this.metricGroup.addValueMetric(registry.rebalanceMode, new ConnectMetrics.LiteralSupplier<Double>(){

                @Override
                public Double metricValue(long now) {
                    return HerderMetrics.this.rebalancing ? 1.0 : 0.0;
                }
            });
            this.rebalanceCompletedCounts = this.metricGroup.sensor("completed-rebalance-count");
            this.rebalanceCompletedCounts.add(this.metricGroup.metricName(registry.rebalanceCompletedTotal), (MeasurableStat)new CumulativeSum());
            this.rebalanceTime = this.metricGroup.sensor("rebalance-time");
            this.rebalanceTime.add(this.metricGroup.metricName(registry.rebalanceTimeMax), (MeasurableStat)new Max());
            this.rebalanceTime.add(this.metricGroup.metricName(registry.rebalanceTimeAvg), (MeasurableStat)new Avg());
            this.metricGroup.addValueMetric(registry.rebalanceTimeSinceLast, new ConnectMetrics.LiteralSupplier<Double>(){

                @Override
                public Double metricValue(long now) {
                    return HerderMetrics.this.lastRebalanceCompletedAtMillis == Long.MIN_VALUE ? Double.POSITIVE_INFINITY : (double)(now - HerderMetrics.this.lastRebalanceCompletedAtMillis);
                }
            });
        }

        void close() {
            this.metricGroup.close();
        }

        void rebalanceStarted(long now) {
            this.rebalanceStartedAtMillis = now;
            this.rebalancing = true;
        }

        void rebalanceSucceeded(long now) {
            long duration = Math.max(0L, now - this.rebalanceStartedAtMillis);
            this.rebalancing = false;
            this.rebalanceCompletedCounts.record(1.0);
            this.rebalanceTime.record((double)duration);
            this.lastRebalanceCompletedAtMillis = now;
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    public class RebalanceListener
    implements WorkerRebalanceListener {
        private final Time time;

        RebalanceListener(Time time) {
            this.time = time;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onAssigned(ExtendedAssignment assignment, int generation) {
            short priorProtocolVersion = DistributedHerder.this.currentProtocolVersion;
            DistributedHerder.this.currentProtocolVersion = DistributedHerder.this.member.currentProtocolVersion();
            DistributedHerder.this.log.info("Joined group at generation {} with protocol version {} and got assignment: {} with rebalance delay: {}", new Object[]{generation, DistributedHerder.this.currentProtocolVersion, assignment, assignment.delay()});
            DistributedHerder distributedHerder = DistributedHerder.this;
            synchronized (distributedHerder) {
                DistributedHerder.this.assignment = assignment;
                DistributedHerder.this.generation = generation;
                int delay = assignment.delay();
                DistributedHerder.this.scheduledRebalance = delay > 0 ? this.time.milliseconds() + (long)delay : Long.MAX_VALUE;
                boolean requestValidationWasEnabled = DistributedHerder.internalRequestValidationEnabled(priorProtocolVersion);
                boolean requestValidationNowEnabled = DistributedHerder.internalRequestValidationEnabled(DistributedHerder.this.currentProtocolVersion);
                if (requestValidationNowEnabled != requestValidationWasEnabled) {
                    if (requestValidationNowEnabled) {
                        DistributedHerder.this.log.info("Internal request validation has been re-enabled");
                    } else {
                        DistributedHerder.this.log.warn("The protocol used by this Connect cluster has been downgraded from '{}' to '{}' and internal request validation is now disabled. This is most likely caused by a new worker joining the cluster with an older protocol specified for the {} configuration; if this is not intentional, either remove the {} configuration from that worker's config file, or change its value to '{}'. If this configuration is left as-is, the cluster will be insecure; for more information, see KIP-507: https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints", new Object[]{ConnectProtocolCompatibility.fromProtocolVersion(priorProtocolVersion), ConnectProtocolCompatibility.fromProtocolVersion(DistributedHerder.this.currentProtocolVersion), "connect.protocol", "connect.protocol", ConnectProtocolCompatibility.SESSIONED.name()});
                    }
                }
                DistributedHerder.this.rebalanceResolved = false;
                DistributedHerder.this.herderMetrics.rebalanceStarted(this.time.milliseconds());
            }
            if (DistributedHerder.this.isLeader()) {
                DistributedHerder.this.updateDeletedConnectorStatus();
                DistributedHerder.this.updateDeletedTaskStatus();
            }
            DistributedHerder.this.member.wakeup();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
            if (DistributedHerder.this.rebalanceResolved) {
                ArrayList<Callable> callables = new ArrayList<Callable>();
                for (String connectorName : connectors) {
                    callables.add(DistributedHerder.this.getConnectorStoppingCallable(connectorName));
                }
                for (ConnectorTaskId taskId : tasks) {
                    callables.add(DistributedHerder.this.getTaskStoppingCallable(taskId));
                }
                DistributedHerder.this.startAndStop(callables);
                DistributedHerder.this.log.info("Finished stopping tasks in preparation for rebalance");
                DistributedHerder distributedHerder = DistributedHerder.this;
                synchronized (distributedHerder) {
                    DistributedHerder.this.log.debug("Removing connectors from running assignment {}", connectors);
                    DistributedHerder.this.runningAssignment.connectors().removeAll(connectors);
                    DistributedHerder.this.log.debug("Removing tasks from running assignment {}", tasks);
                    DistributedHerder.this.runningAssignment.tasks().removeAll(tasks);
                }
                if (DistributedHerder.this.isTopicTrackingEnabled) {
                    this.resetActiveTopics(connectors, tasks);
                }
                DistributedHerder.this.statusBackingStore.flush();
                DistributedHerder.this.log.info("Finished flushing status backing store in preparation for rebalance");
            } else {
                DistributedHerder.this.log.info("Wasn't able to resume work after last rebalance, can skip stopping connectors and tasks");
            }
        }

        private void resetActiveTopics(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
            connectors.stream().filter(connectorName -> !DistributedHerder.this.configState.contains((String)connectorName)).forEach(DistributedHerder.this::resetConnectorActiveTopics);
            tasks.stream().map(ConnectorTaskId::connector).distinct().filter(connectorName -> !DistributedHerder.this.configState.contains((String)connectorName)).forEach(DistributedHerder.this::resetConnectorActiveTopics);
        }
    }

    class DistributedHerderRequest
    implements HerderRequest,
    Comparable<DistributedHerderRequest> {
        private final long at;
        private final long seq;
        private final Callable<Void> action;
        private final Callback<Void> callback;

        public DistributedHerderRequest(long at, long seq, Callable<Void> action, Callback<Void> callback) {
            this.at = at;
            this.seq = seq;
            this.action = action;
            this.callback = callback;
        }

        public Callable<Void> action() {
            return this.action;
        }

        public Callback<Void> callback() {
            return this.callback;
        }

        @Override
        public void cancel() {
            DistributedHerder.this.requests.remove(this);
        }

        @Override
        public int compareTo(DistributedHerderRequest o) {
            int cmp = Long.compare(this.at, o.at);
            return cmp == 0 ? Long.compare(this.seq, o.seq) : cmp;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof DistributedHerderRequest)) {
                return false;
            }
            DistributedHerderRequest other = (DistributedHerderRequest)o;
            return this.compareTo(other) == 0;
        }

        public int hashCode() {
            return Objects.hash(this.at, this.seq);
        }
    }

    public class ConfigUpdateListener
    implements ConfigBackingStore.UpdateListener {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onConnectorConfigRemove(String connector) {
            DistributedHerder.this.log.info("Connector {} config removed", (Object)connector);
            DistributedHerder distributedHerder = DistributedHerder.this;
            synchronized (distributedHerder) {
                if (DistributedHerder.this.configState.contains(connector)) {
                    DistributedHerder.this.needsReconfigRebalance = true;
                }
                DistributedHerder.this.connectorConfigUpdates.add(connector);
            }
            DistributedHerder.this.member.wakeup();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onConnectorConfigUpdate(String connector) {
            DistributedHerder.this.log.info("Connector {} config updated", (Object)connector);
            DistributedHerder distributedHerder = DistributedHerder.this;
            synchronized (distributedHerder) {
                if (!DistributedHerder.this.configState.contains(connector)) {
                    DistributedHerder.this.needsReconfigRebalance = true;
                }
                DistributedHerder.this.connectorConfigUpdates.add(connector);
            }
            DistributedHerder.this.member.wakeup();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onTaskConfigUpdate(Collection<ConnectorTaskId> tasks) {
            DistributedHerder.this.log.info("Tasks {} configs updated", tasks);
            DistributedHerder distributedHerder = DistributedHerder.this;
            synchronized (distributedHerder) {
                DistributedHerder.this.needsReconfigRebalance = true;
                DistributedHerder.this.taskConfigUpdates.addAll(tasks);
            }
            DistributedHerder.this.member.wakeup();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onConnectorTargetStateChange(String connector) {
            DistributedHerder.this.log.info("Connector {} target state change", (Object)connector);
            DistributedHerder distributedHerder = DistributedHerder.this;
            synchronized (distributedHerder) {
                DistributedHerder.this.connectorTargetStateChanges.add(connector);
            }
            DistributedHerder.this.member.wakeup();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onSessionKeyUpdate(SessionKey sessionKey) {
            DistributedHerder.this.log.info("Session key updated");
            DistributedHerder distributedHerder = DistributedHerder.this;
            synchronized (distributedHerder) {
                DistributedHerder.this.sessionKey = sessionKey.key();
                if (DistributedHerder.this.isLeader() && DistributedHerder.this.keyRotationIntervalMs > 0) {
                    DistributedHerder.this.keyExpiration = sessionKey.creationTimestamp() + (long)DistributedHerder.this.keyRotationIntervalMs;
                }
            }
        }
    }
}

