/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import com.sun.management.ThreadMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.CompoundStat;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Frequencies;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.StateTracker;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskPluginsMetadata;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.TransformationChain;
import org.apache.kafka.connect.runtime.TransformationStage;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.tracing.Tracer;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(WorkerTask.class);
    private static final String THREAD_NAME_PREFIX = "task-thread-";
    private final TaskStatus.Listener statusListener;
    private final StatusBackingStore statusBackingStore;
    protected final ConnectorTaskId id;
    protected final ClassLoader loader;
    protected final Time time;
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private final TaskMetricsGroup taskMetricsGroup;
    protected long taskThreadId;
    final Map<Long, Long> threadCpuTimeBaselines;
    private volatile TargetState targetState;
    private volatile boolean failed;
    private volatile boolean stopping;
    private volatile boolean cancelled;
    private final ErrorHandlingMetrics errorMetrics;
    protected final RetryWithToleranceOperator<T> retryWithToleranceOperator;
    protected final TransformationChain<T, R> transformationChain;
    private final Supplier<List<ErrorReporter<T>>> errorReportersSupplier;
    protected final Function<ClassLoader, LoaderSwap> pluginLoaderSwapper;
    protected final PluginMetricsImpl pluginMetrics;
    protected Optional<Tracer> tracer = Optional.empty();
    private long taskLatestCpuUsageRecordTime = 0L;
    ScheduledExecutorService scheduler;

    public WorkerTask(ConnectorTaskId id, TaskStatus.Listener statusListener, TargetState initialState, ClassLoader loader, ConnectMetrics connectMetrics, ErrorHandlingMetrics errorMetrics, RetryWithToleranceOperator<T> retryWithToleranceOperator, TransformationChain<T, R> transformationChain, Supplier<List<ErrorReporter<T>>> errorReportersSupplier, Time time, StatusBackingStore statusBackingStore, TaskPluginsMetadata pluginsMetadata, Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
        this.id = id;
        this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener, pluginsMetadata);
        this.errorMetrics = errorMetrics;
        this.statusListener = this.taskMetricsGroup;
        this.loader = loader;
        this.targetState = initialState;
        this.failed = false;
        this.stopping = false;
        this.cancelled = false;
        this.taskMetricsGroup.recordState(this.targetState);
        this.retryWithToleranceOperator = retryWithToleranceOperator;
        this.transformationChain = transformationChain;
        this.errorReportersSupplier = errorReportersSupplier;
        this.time = time;
        this.statusBackingStore = statusBackingStore;
        this.pluginLoaderSwapper = pluginLoaderSwapper;
        this.pluginMetrics = connectMetrics.taskPluginMetrics(id);
        this.taskLatestCpuUsageRecordTime = Instant.now().toEpochMilli();
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.threadCpuTimeBaselines = new HashMap<Long, Long>();
    }

    public ConnectorTaskId id() {
        return this.id;
    }

    public ClassLoader loader() {
        return this.loader;
    }

    public PluginMetrics pluginMetrics() {
        return this.pluginMetrics;
    }

    public abstract void initialize(TaskConfig var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void triggerStop() {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            this.stopping = true;
            this.notifyAll();
        }
        ThreadUtils.shutdownExecutorServiceQuietly((ExecutorService)this.scheduler, (long)60L, (TimeUnit)TimeUnit.SECONDS);
    }

    public void stop() {
        this.triggerStop();
    }

    public void cancel() {
        this.cancelled = true;
        this.retryWithToleranceOperator.triggerStop();
    }

    public boolean awaitStop(long timeoutMs) {
        try {
            return this.shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    public void removeMetrics() {
        Utils.closeQuietly(this.taskMetricsGroup::close, (String)"Task metrics group");
        Utils.closeQuietly((AutoCloseable)this.errorMetrics, (String)"Error handling metrics");
    }

    void doStart() {
        this.retryWithToleranceOperator.reporters(this.errorReportersSupplier.get());
        this.initializeAndStart();
        this.statusListener.onStartup(this.id);
    }

    protected abstract void initializeAndStart();

    protected abstract void execute();

    protected abstract void close();

    protected abstract String taskVersion();

    protected boolean isFailed() {
        return this.failed;
    }

    protected boolean isStopping() {
        return this.stopping || this.targetState == TargetState.STOPPED;
    }

    protected boolean isCancelled() {
        return this.cancelled;
    }

    protected boolean isTaskConnectorMetricsEnabled(WorkerConfig workerConfig) {
        try {
            return workerConfig.getBoolean("confluent.task.thread.load.metrics.enable");
        }
        catch (Exception e) {
            return false;
        }
    }

    void doClose() {
        try {
            this.tracer.ifPresent(t -> Utils.closeQuietly((AutoCloseable)t, (String)("tracer id:" + t.tracingContext().traceID().toString())));
            this.close();
        }
        catch (Throwable t2) {
            log.error("{} Task threw an uncaught and unrecoverable exception during shutdown", (Object)this, (Object)t2);
            throw t2;
        }
        finally {
            Utils.closeQuietly(this.transformationChain, (String)"transformation chain");
            Utils.closeQuietly(this.retryWithToleranceOperator, (String)"retry operator");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void doRun() throws InterruptedException {
        try {
            WorkerTask workerTask = this;
            synchronized (workerTask) {
                if (this.isStopping()) {
                    // MONITOREXIT @DISABLED, blocks:[0, 6, 14] lbl5 : MonitorExitStatement: MONITOREXIT : var1_1
                    this.doClose();
                    return;
                }
                if (this.targetState == TargetState.PAUSED) {
                    this.onPause();
                    if (!this.awaitUnpause()) {
                        // MONITOREXIT @DISABLED, blocks:[0, 6, 12, 13] lbl11 : MonitorExitStatement: MONITOREXIT : var1_1
                        this.doClose();
                        return;
                    }
                }
            }
            this.tracer.ifPresent(Tracer::start);
            this.doStart();
            this.execute();
            return;
        }
        catch (Throwable t) {
            this.failed = true;
            if (this.cancelled) {
                log.warn("{} After being scheduled for shutdown, the orphan task threw an uncaught exception. A newer instance of this task might be already running", (Object)this, (Object)t);
                return;
            }
            if (this.isStopping()) {
                log.warn("{} After being scheduled for shutdown, task threw an uncaught exception.", (Object)this, (Object)t);
                return;
            }
            log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", (Object)this, (Object)t);
            throw t;
        }
        catch (Throwable throwable) {
            throw throwable;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onShutdown() {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            this.triggerStop();
            if (!this.cancelled) {
                this.statusListener.onShutdown(this.id);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onFailure(Throwable t) {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            this.triggerStop();
            if (!this.cancelled) {
                this.statusListener.onFailure(this.id, t);
            }
        }
    }

    protected synchronized void onPause() {
        log.info("task {} is out of poll loop, pausing now", (Object)this.id);
        this.statusListener.onPause(this.id);
    }

    protected synchronized void onResume() {
        this.statusListener.onResume(this.id);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LoggingContext.clear();
        try (LoggingContext loggingContext = LoggingContext.forTask(this.id());){
            String savedName = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(THREAD_NAME_PREFIX + String.valueOf(this.id));
                this.doRun();
                this.onShutdown();
                Thread.currentThread().setName(savedName);
                this.shutdownLatch.countDown();
            }
            catch (Throwable t) {
                try {
                    this.onFailure(t);
                    if (t instanceof Error) {
                        throw (Error)t;
                    }
                }
                catch (Throwable throwable) {
                    throw throwable;
                }
                finally {
                    Thread.currentThread().setName(savedName);
                    this.shutdownLatch.countDown();
                }
            }
        }
    }

    public boolean shouldPause() {
        return this.targetState == TargetState.PAUSED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean awaitUnpause() throws InterruptedException {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            while (this.targetState == TargetState.PAUSED) {
                if (this.isStopping()) {
                    return false;
                }
                this.wait();
            }
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void transitionTo(TargetState state) {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            if (this.isStopping()) {
                log.debug("{} Ignoring request to transition stopped task {} to state {}", new Object[]{this, this.id, state});
                return;
            }
            if (this.targetState == TargetState.STOPPED) {
                log.warn("{} Received unexpected request to transition task {} to state {}; will shut down in response", new Object[]{this, this.id, TargetState.STOPPED});
            }
            this.targetState = state;
            this.notifyAll();
        }
    }

    @Confluent
    public WorkerTask useTracer(Tracer tracer) {
        this.tracer = Optional.of(tracer);
        return this;
    }

    protected void recordActiveTopic(String topic) {
        if (this.statusBackingStore.getTopic(this.id.connector(), topic) != null) {
            return;
        }
        this.statusBackingStore.put(new TopicStatus(topic, this.id, this.time.milliseconds()));
    }

    protected void recordCommitSuccess(long duration) {
        this.taskMetricsGroup.recordCommit(duration, true);
    }

    protected void recordCommitFailure(long duration) {
        this.taskMetricsGroup.recordCommit(duration, false);
    }

    protected void recordBatch(int size) {
        this.taskMetricsGroup.recordBatch(size);
    }

    protected void recordCPUUsage(long cpuCyclesUsed) {
        if (this.taskLatestCpuUsageRecordTime == 0L) {
            this.taskLatestCpuUsageRecordTime = Instant.now().toEpochMilli();
            return;
        }
        long elapsedTime = Instant.now().toEpochMilli() - this.taskLatestCpuUsageRecordTime;
        if (elapsedTime != 0L) {
            log.debug("CPU cycles used by task thread {} is {}", (Object)this.taskThreadId, (Object)cpuCyclesUsed);
            double taskCPULoad = (double)cpuCyclesUsed / (double)elapsedTime;
            this.taskMetricsGroup.recordCPULoadRatio(taskCPULoad);
        }
        this.taskLatestCpuUsageRecordTime = Instant.now().toEpochMilli();
    }

    protected void recordMemUsage(long memBytes) {
        this.taskMetricsGroup.recordMemLoad(memBytes);
    }

    protected void recordTaskMetrics() {
        ThreadInfoManager threadInfoManager = ThreadInfoManager.getInstance();
        ThreadMXBean threadMXBean = (ThreadMXBean)ManagementFactory.getThreadMXBean();
        if (!threadMXBean.isThreadCpuTimeSupported()) {
            return;
        }
        long totalCpuTimeDelta = 0L;
        HashSet<Long> currentThreadIds = new HashSet<Long>();
        Map<String, Long> threadNameToIdMap = threadInfoManager.getThreadInfoMap();
        for (Map.Entry<String, Long> entry : threadNameToIdMap.entrySet()) {
            if (!entry.getKey().startsWith(this.id().toString())) continue;
            log.debug("Found a matching thread with name: {}, id {}", (Object)entry.getKey(), (Object)entry.getValue());
            currentThreadIds.add(entry.getValue());
        }
        currentThreadIds.add(this.taskThreadId);
        for (Long threadId : currentThreadIds) {
            long currentCpuTime = threadMXBean.getThreadCpuTime(threadId);
            if (this.threadCpuTimeBaselines.containsKey(threadId)) {
                long previousCpuTime = this.threadCpuTimeBaselines.get(threadId);
                long cpuDelta = currentCpuTime - previousCpuTime;
                log.debug("CPU cycle of thread {} is {}", (Object)threadId, (Object)currentCpuTime);
                if (cpuDelta > 0L) {
                    totalCpuTimeDelta += cpuDelta;
                }
            }
            this.threadCpuTimeBaselines.put(threadId, currentCpuTime);
        }
        this.threadCpuTimeBaselines.keySet().retainAll(currentThreadIds);
        this.recordCPUUsage(totalCpuTimeDelta);
    }

    TaskMetricsGroup taskMetricsGroup() {
        return this.taskMetricsGroup;
    }

    static class TaskMetricsGroup
    implements TaskStatus.Listener {
        private final TaskStatus.Listener delegateListener;
        private final ConnectMetrics.MetricGroup metricGroup;
        private final List<ConnectMetrics.MetricGroup> transformationGroups = new ArrayList<ConnectMetrics.MetricGroup>();
        private final List<ConnectMetrics.MetricGroup> predicateGroups = new ArrayList<ConnectMetrics.MetricGroup>();
        private final Time time;
        private final StateTracker taskStateTimer;
        private final Sensor commitTime;
        private final Sensor batchSize;
        private final Sensor commitAttempts;
        private final ConnectMetrics connectMetrics;
        private final ConnectorTaskId id;
        protected final Sensor taskMemUsage;
        protected final Sensor taskCPULoadRatio;

        public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskStatus.Listener statusListener) {
            this(id, connectMetrics, statusListener, null);
        }

        public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskStatus.Listener statusListener, TaskPluginsMetadata pluginsMetadata) {
            this.delegateListener = statusListener;
            this.connectMetrics = connectMetrics;
            this.id = id;
            this.time = connectMetrics.time();
            this.taskStateTimer = new StateTracker();
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.taskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task()));
            this.metricGroup.close();
            this.metricGroup.addValueMetric(registry.taskStatus, now -> this.taskStateTimer.currentState().toString().toLowerCase(Locale.getDefault()));
            this.addRatioMetric(AbstractStatus.State.RUNNING, registry.taskRunningRatio);
            this.addRatioMetric(AbstractStatus.State.PAUSED, registry.taskPauseRatio);
            this.commitTime = this.metricGroup.sensor("commit-time");
            this.commitTime.add(this.metricGroup.metricName(registry.taskCommitTimeMax), (MeasurableStat)new Max());
            this.commitTime.add(this.metricGroup.metricName(registry.taskCommitTimeAvg), (MeasurableStat)new Avg());
            this.batchSize = this.metricGroup.sensor("batch-size");
            this.batchSize.add(this.metricGroup.metricName(registry.taskBatchSizeMax), (MeasurableStat)new Max());
            this.batchSize.add(this.metricGroup.metricName(registry.taskBatchSizeAvg), (MeasurableStat)new Avg());
            this.taskMemUsage = this.metricGroup.sensor("task-mem-usages");
            this.taskMemUsage.add(this.metricGroup.metricName(registry.taskMemoryUsageAvg), (MeasurableStat)new Avg());
            this.taskMemUsage.add(this.metricGroup.metricName(registry.taskMemoryUsageMax), (MeasurableStat)new Max());
            this.taskCPULoadRatio = this.metricGroup.sensor("task-cpu-usage");
            this.taskCPULoadRatio.add(this.metricGroup.metricName(registry.taskCPULoadRatioAvg), (MeasurableStat)new Avg());
            this.taskCPULoadRatio.add(this.metricGroup.metricName(registry.taskCPULoadRatioMax), (MeasurableStat)new Max());
            MetricName offsetCommitFailures = this.metricGroup.metricName(registry.taskCommitFailurePercentage);
            MetricName offsetCommitSucceeds = this.metricGroup.metricName(registry.taskCommitSuccessPercentage);
            Frequencies commitFrequencies = Frequencies.forBooleanValues((MetricName)offsetCommitFailures, (MetricName)offsetCommitSucceeds);
            this.commitAttempts = this.metricGroup.sensor("offset-commit-completion");
            this.commitAttempts.add((CompoundStat)commitFrequencies);
            this.addPluginInfoMetric(pluginsMetadata);
        }

        private void addRatioMetric(AbstractStatus.State matchingState, MetricNameTemplate template) {
            MetricName metricName = this.metricGroup.metricName(template);
            this.metricGroup.metrics().addMetricIfAbsent(metricName, null, (MetricValueProvider)((Gauge)(config, now) -> this.taskStateTimer.durationRatio(matchingState, now)));
        }

        private void addPluginInfoMetric(TaskPluginsMetadata pluginsMetadata) {
            if (pluginsMetadata == null) {
                return;
            }
            ConnectMetricsRegistry registry = this.connectMetrics.registry();
            this.metricGroup.addValueMetric(registry.taskConnectorClass, now -> pluginsMetadata.connectorClass());
            this.metricGroup.addValueMetric(registry.taskConnectorClassVersion, now -> pluginsMetadata.connectorVersion());
            this.metricGroup.addValueMetric(registry.taskConnectorType, now -> pluginsMetadata.connectorType());
            this.metricGroup.addValueMetric(registry.taskClass, now -> pluginsMetadata.taskClass());
            this.metricGroup.addValueMetric(registry.taskVersion, now -> pluginsMetadata.taskVersion());
            this.metricGroup.addValueMetric(registry.taskKeyConverterClass, now -> pluginsMetadata.keyConverterClass());
            this.metricGroup.addValueMetric(registry.taskKeyConverterVersion, now -> pluginsMetadata.keyConverterVersion());
            this.metricGroup.addValueMetric(registry.taskValueConverterClass, now -> pluginsMetadata.valueConverterClass());
            this.metricGroup.addValueMetric(registry.taskValueConverterVersion, now -> pluginsMetadata.valueConverterVersion());
            this.metricGroup.addValueMetric(registry.taskHeaderConverterClass, now -> pluginsMetadata.headerConverterClass());
            this.metricGroup.addValueMetric(registry.taskHeaderConverterVersion, now -> pluginsMetadata.headerConverterVersion());
            if (!pluginsMetadata.transformations().isEmpty()) {
                for (TransformationStage.AliasedPluginInfo entry : pluginsMetadata.transformations()) {
                    ConnectMetrics.MetricGroup transformationGroup = this.connectMetrics.group(registry.transformsGroupName(), registry.connectorTagName(), this.id.connector(), registry.taskTagName(), Integer.toString(this.id.task()), registry.transformsTagName(), entry.alias());
                    transformationGroup.addValueMetric(registry.transformClass, now -> entry.className());
                    transformationGroup.addValueMetric(registry.transformVersion, now -> entry.version());
                    this.transformationGroups.add(transformationGroup);
                }
            }
            if (!pluginsMetadata.predicates().isEmpty()) {
                for (TransformationStage.AliasedPluginInfo entry : pluginsMetadata.predicates()) {
                    ConnectMetrics.MetricGroup predicateGroup = this.connectMetrics.group(registry.predicatesGroupName(), registry.connectorTagName(), this.id.connector(), registry.taskTagName(), Integer.toString(this.id.task()), registry.predicateTagName(), entry.alias());
                    predicateGroup.addValueMetric(registry.predicateClass, now -> entry.className());
                    predicateGroup.addValueMetric(registry.predicateVersion, now -> entry.version());
                    this.predicateGroups.add(predicateGroup);
                }
            }
        }

        void close() {
            this.metricGroup.close();
            this.transformationGroups.forEach(ConnectMetrics.MetricGroup::close);
            this.predicateGroups.forEach(ConnectMetrics.MetricGroup::close);
        }

        void recordCommit(long duration, boolean success) {
            if (success) {
                this.commitTime.record((double)duration);
                this.commitAttempts.record(1.0);
            } else {
                this.commitAttempts.record(0.0);
            }
        }

        void recordBatch(int size) {
            this.batchSize.record((double)size);
        }

        void recordCPULoadRatio(double ratio) {
            this.taskCPULoadRatio.record(ratio);
        }

        void recordMemLoad(long memLoad) {
            this.taskMemUsage.record((double)memLoad);
        }

        @Override
        public void onStartup(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.RUNNING, this.time.milliseconds());
            this.delegateListener.onStartup(id);
        }

        @Override
        public void onFailure(ConnectorTaskId id, Throwable cause) {
            this.taskStateTimer.changeState(AbstractStatus.State.FAILED, this.time.milliseconds());
            this.delegateListener.onFailure(id, cause);
        }

        @Override
        public void onPause(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.PAUSED, this.time.milliseconds());
            this.delegateListener.onPause(id);
        }

        @Override
        public void onResume(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.RUNNING, this.time.milliseconds());
            this.delegateListener.onResume(id);
        }

        @Override
        public void onShutdown(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.UNASSIGNED, this.time.milliseconds());
            this.delegateListener.onShutdown(id);
        }

        @Override
        public void onDeletion(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.DESTROYED, this.time.milliseconds());
            this.delegateListener.onDeletion(id);
        }

        @Override
        public void onRestart(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.RESTARTING, this.time.milliseconds());
            this.delegateListener.onRestart(id);
        }

        public void recordState(TargetState state) {
            switch (state) {
                case STARTED: {
                    this.taskStateTimer.changeState(AbstractStatus.State.RUNNING, this.time.milliseconds());
                    break;
                }
                case PAUSED: {
                    this.taskStateTimer.changeState(AbstractStatus.State.PAUSED, this.time.milliseconds());
                    break;
                }
            }
        }

        public AbstractStatus.State state() {
            return this.taskStateTimer.currentState();
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    static class ThreadInfoManager {
        private static ThreadInfoManager instance;
        private static final long REFRESH_INTERVAL_MS = 60000L;
        private final ThreadMXBean threadMXBean;
        private Map<String, Long> threadInfoMap;
        private long lastRefreshTime;

        private ThreadInfoManager(ThreadMXBean threadMXBean) {
            this.threadMXBean = threadMXBean;
            this.threadInfoMap = new HashMap<String, Long>();
            this.lastRefreshTime = 0L;
            this.refreshThreadInfo();
        }

        public static ThreadInfoManager getInstance() {
            if (instance == null) {
                instance = new ThreadInfoManager((ThreadMXBean)ManagementFactory.getThreadMXBean());
            }
            return instance;
        }

        public static ThreadInfoManager getTestInstance(ThreadMXBean mockThreadMXBean) {
            return new ThreadInfoManager(mockThreadMXBean);
        }

        public static void setInstance(ThreadInfoManager testInstance) {
            instance = testInstance;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Map<String, Long> getThreadInfoMap() {
            ThreadInfoManager threadInfoManager = this;
            synchronized (threadInfoManager) {
                long currentTime = System.currentTimeMillis();
                if (currentTime - this.lastRefreshTime > 60000L) {
                    this.refreshThreadInfo();
                }
                return this.threadInfoMap;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void refreshThreadInfo() {
            ThreadInfoManager threadInfoManager = this;
            synchronized (threadInfoManager) {
                HashMap<String, Long> newThreadInfoMap = new HashMap<String, Long>();
                for (long threadId : this.threadMXBean.getAllThreadIds()) {
                    ThreadInfo threadInfo = this.threadMXBean.getThreadInfo(threadId);
                    if (threadInfo == null || threadInfo.getThreadName() == null) continue;
                    newThreadInfoMap.put(threadInfo.getThreadName(), threadId);
                }
                this.threadInfoMap = newThreadInfoMap;
                this.lastRefreshTime = System.currentTimeMillis();
            }
        }
    }
}

