/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import io.confluent.logevents.connect.LogEventsEmitter;
import io.confluent.logevents.connect.LogEventsKafkaEmitter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.CloseableConnectorContext;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TracingTransformationChain;
import org.apache.kafka.connect.runtime.TransformationChain;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigDecorator;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerConnector;
import org.apache.kafka.connect.runtime.WorkerMetricsGroup;
import org.apache.kafka.connect.runtime.WorkerSinkTask;
import org.apache.kafka.connect.runtime.WorkerSourceTask;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.TraceReporter;
import org.apache.kafka.connect.runtime.errors.TraceWorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.tracing.ConnectTracer;
import org.apache.kafka.connect.runtime.tracing.Tracer;
import org.apache.kafka.connect.runtime.tracing.TracerConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker {
    public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5L);
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    public static final String FAIL_NON_TOLERATED_EXCEPTION = "fail.non.tolerated.exception";
    protected Herder herder;
    private final ExecutorService executor;
    private final Time time;
    private final String workerId;
    private final String kafkaClusterId;
    private final Plugins plugins;
    private final ConnectMetrics metrics;
    private final WorkerMetricsGroup workerMetricsGroup;
    private ConnectorStatusMetricsGroup connectorStatusMetricsGroup;
    private final WorkerConfig config;
    private final Converter internalKeyConverter;
    private final Converter internalValueConverter;
    private final OffsetBackingStore offsetBackingStore;
    private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<String, WorkerConnector>();
    private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<ConnectorTaskId, WorkerTask>();
    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
    private final WorkerConfigTransformer workerConfigTransformer;
    private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
    private final WorkerConfigDecorator workerConfigDecorator;
    private final LogEventsKafkaEmitter logEventsKafkaEmitter;

    public Worker(String workerId, Time time, Plugins plugins, WorkerConfig config, OffsetBackingStore offsetBackingStore, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy);
    }

    Worker(String workerId, Time time, Plugins plugins, WorkerConfig config, OffsetBackingStore offsetBackingStore, ExecutorService executorService, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this.kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
        this.metrics = new ConnectMetrics(workerId, config, time, this.kafkaClusterId);
        this.executor = executorService;
        this.workerId = workerId;
        this.time = time;
        this.plugins = plugins;
        this.config = config;
        this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
        this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors, this.tasks, this.metrics);
        Map<String, String> internalConverterConfig = Collections.singletonMap("schemas.enable", "false");
        this.internalKeyConverter = plugins.newInternalConverter(true, JsonConverter.class.getName(), internalConverterConfig);
        this.internalValueConverter = plugins.newInternalConverter(false, JsonConverter.class.getName(), internalConverterConfig);
        this.offsetBackingStore = offsetBackingStore;
        this.offsetBackingStore.configure(config);
        this.workerConfigTransformer = this.initConfigTransformer();
        this.workerConfigDecorator = WorkerConfigDecorator.initialize(this.config, this.workerConfigTransformer);
        this.logEventsKafkaEmitter = new LogEventsKafkaEmitter();
    }

    private WorkerConfigTransformer initConfigTransformer() {
        List providerNames = this.config.getList("config.providers");
        HashMap<String, ConfigProvider> providerMap = new HashMap<String, ConfigProvider>();
        for (String providerName : providerNames) {
            ConfigProvider configProvider = this.plugins.newConfigProvider(this.config, "config.providers." + providerName, Plugins.ClassLoaderUsage.PLUGINS);
            providerMap.put(providerName, configProvider);
        }
        return new WorkerConfigTransformer(this, providerMap);
    }

    public WorkerConfigTransformer configTransformer() {
        return this.workerConfigTransformer;
    }

    public WorkerConfigDecorator configDecorator() {
        return this.workerConfigDecorator;
    }

    protected Herder herder() {
        return this.herder;
    }

    public LogEventsEmitter logEventsEmitter() {
        return this.logEventsKafkaEmitter;
    }

    public void start() {
        log.info("Worker starting");
        this.offsetBackingStore.start();
        this.sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(this.config);
        this.connectorStatusMetricsGroup = new ConnectorStatusMetricsGroup(this.metrics, this.tasks, this.herder);
        this.logEventsKafkaEmitter.start(this.config.originals());
        log.info("Worker started");
    }

    public void stop() {
        log.info("Worker stopping");
        long started = this.time.milliseconds();
        long limit = started + this.config.getLong("task.shutdown.graceful.timeout.ms");
        if (!this.connectors.isEmpty()) {
            log.warn("Shutting down connectors {} uncleanly; herder should have shut down connectors before the Worker is stopped", this.connectors.keySet());
            this.stopAndAwaitConnectors();
        }
        if (!this.tasks.isEmpty()) {
            log.warn("Shutting down tasks {} uncleanly; herder should have shut down tasks before the Worker is stopped", this.tasks.keySet());
            this.stopAndAwaitTasks();
        }
        long timeoutMs = limit - this.time.milliseconds();
        this.sourceTaskOffsetCommitter.close(timeoutMs);
        this.offsetBackingStore.stop();
        this.metrics.stop();
        this.logEventsKafkaEmitter.stop();
        log.info("Worker stopped");
        this.workerMetricsGroup.close();
        this.connectorStatusMetricsGroup.close();
        this.workerConfigTransformer.close();
    }

    public void startConnector(String connName, Map<String, String> connProps, CloseableConnectorContext ctx, ConnectorStatus.Listener statusListener, TargetState initialState, Callback<TargetState> onConnectorStateChange) {
        ConnectorStatus.Listener connectorStatusListener = this.workerMetricsGroup.wrapStatusListener(statusListener);
        try (LoggingContext loggingContext = LoggingContext.forConnector(connName);){
            WorkerConnector workerConnector;
            if (this.connectors.containsKey(connName)) {
                onConnectorStateChange.onCompletion(new ConnectException("Connector with name " + connName + " already exists"), null);
                return;
            }
            ClassLoader savedLoader = this.plugins.currentThreadLoader();
            try {
                String connClass = connProps.get("connector.class");
                ClassLoader connectorLoader = this.plugins.delegatingLoader().connectorLoader(connClass);
                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
                log.info("Creating connector {} of type {}", (Object)connName, (Object)connClass);
                Connector connector = this.plugins.newConnector(connClass);
                connProps = this.configDecorator().decorateConnectorConfig(connName, connector, connector.config(), connProps);
                ConnectorConfig connConfig = ConnectUtils.isSinkConnector(connector) ? new SinkConnectorConfig(this.plugins, connProps) : new SourceConnectorConfig(this.plugins, connProps, this.config.topicCreationEnable());
                OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(this.offsetBackingStore, connName, this.internalKeyConverter, this.internalValueConverter);
                workerConnector = new WorkerConnector(connName, connector, connConfig, ctx, this.metrics, connectorStatusListener, offsetReader, connectorLoader);
                if (this.config.taskStatusMetricsEnabled().booleanValue()) {
                    workerConnector.metrics().addHerderMetrics(this.herder);
                }
                log.info("Instantiated connector {} with version {} of type {}", new Object[]{connName, connector.version(), connector.getClass()});
                workerConnector.transitionTo(initialState, onConnectorStateChange);
                Plugins.compareAndSwapLoaders(savedLoader);
            }
            catch (Throwable t) {
                log.error("Failed to start connector {}", (Object)connName, (Object)t);
                Plugins.compareAndSwapLoaders(savedLoader);
                connectorStatusListener.onFailure(connName, t);
                onConnectorStateChange.onCompletion(t, null);
                if (loggingContext != null) {
                    if (var9_9 != null) {
                        try {
                            loggingContext.close();
                        }
                        catch (Throwable throwable) {
                            var9_9.addSuppressed(throwable);
                        }
                    } else {
                        loggingContext.close();
                    }
                }
                return;
            }
            WorkerConnector existing = this.connectors.putIfAbsent(connName, workerConnector);
            if (existing != null) {
                onConnectorStateChange.onCompletion(new ConnectException("Connector with name " + connName + " already exists"), null);
                return;
            }
            this.executor.submit(workerConnector);
            log.info("Finished creating connector {}", (Object)connName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isSinkConnector(String connName) {
        WorkerConnector workerConnector = (WorkerConnector)this.connectors.get(connName);
        if (workerConnector == null) {
            throw new ConnectException("Connector " + connName + " not found in this worker.");
        }
        ClassLoader savedLoader = this.plugins.currentThreadLoader();
        try {
            savedLoader = Plugins.compareAndSwapLoaders(workerConnector.loader());
            boolean bl = workerConnector.isSinkConnector();
            return bl;
        }
        finally {
            Plugins.compareAndSwapLoaders(savedLoader);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Map<String, String>> connectorTaskConfigs(String connName, ConnectorConfig connConfig) {
        ArrayList<Map<String, String>> result = new ArrayList<Map<String, String>>();
        try (LoggingContext loggingContext = LoggingContext.forConnector(connName);){
            log.trace("Reconfiguring connector tasks for {}", (Object)connName);
            WorkerConnector workerConnector = (WorkerConnector)this.connectors.get(connName);
            if (workerConnector == null) {
                throw new ConnectException("Connector " + connName + " not found in this worker.");
            }
            int maxTasks = connConfig.getInt("tasks.max");
            Map connOriginals = connConfig.originalsStrings();
            Connector connector = workerConnector.connector();
            ClassLoader savedLoader = this.plugins.currentThreadLoader();
            try {
                savedLoader = Plugins.compareAndSwapLoaders(workerConnector.loader());
                String taskClassName = connector.taskClass().getName();
                for (Map taskProps : connector.taskConfigs(maxTasks)) {
                    HashMap<String, String> taskConfig = new HashMap<String, String>(taskProps);
                    taskConfig.put("task.class", taskClassName);
                    if (connOriginals.containsKey("topics")) {
                        taskConfig.put("topics", (String)connOriginals.get("topics"));
                    }
                    if (connOriginals.containsKey("topics.regex")) {
                        taskConfig.put("topics.regex", (String)connOriginals.get("topics.regex"));
                    }
                    result.add(taskConfig);
                }
            }
            finally {
                Plugins.compareAndSwapLoaders(savedLoader);
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopConnector(String connName) {
        try (LoggingContext loggingContext = LoggingContext.forConnector(connName);){
            WorkerConnector workerConnector = (WorkerConnector)this.connectors.get(connName);
            log.info("Stopping connector {}", (Object)connName);
            if (workerConnector == null) {
                log.warn("Ignoring stop request for unowned connector {}", (Object)connName);
                return;
            }
            ClassLoader savedLoader = this.plugins.currentThreadLoader();
            try {
                savedLoader = Plugins.compareAndSwapLoaders(workerConnector.loader());
                workerConnector.shutdown();
            }
            finally {
                Plugins.compareAndSwapLoaders(savedLoader);
            }
        }
    }

    private void stopConnectors(Collection<String> ids) {
        for (String connector : ids) {
            this.stopConnector(connector);
        }
    }

    private void awaitStopConnector(String connName, long timeout) {
        try (LoggingContext loggingContext = LoggingContext.forConnector(connName);){
            WorkerConnector connector = (WorkerConnector)this.connectors.remove(connName);
            if (connector == null) {
                log.warn("Ignoring await stop request for non-present connector {}", (Object)connName);
                return;
            }
            if (!connector.awaitShutdown(timeout)) {
                log.error("Connector \u2018{}\u2019 failed to properly shut down, has become unresponsive, and may be consuming external resources. Correct the configuration for this connector or remove the connector. After fixing the connector, it may be necessary to restart this worker to release any consumed resources.", (Object)connName);
                connector.cancel();
            } else {
                log.debug("Graceful stop of connector {} succeeded.", (Object)connName);
            }
        }
    }

    private void awaitStopConnectors(Collection<String> ids) {
        long now = this.time.milliseconds();
        long deadline = now + CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS;
        for (String id : ids) {
            long remaining = Math.max(0L, deadline - this.time.milliseconds());
            this.awaitStopConnector(id, remaining);
        }
    }

    public void stopAndAwaitConnectors() {
        this.stopAndAwaitConnectors(new ArrayList<String>(this.connectors.keySet()));
    }

    public void stopAndAwaitConnectors(Collection<String> ids) {
        this.stopConnectors(ids);
        this.awaitStopConnectors(ids);
    }

    public void stopAndAwaitConnector(String connName) {
        this.stopConnector(connName);
        this.awaitStopConnectors(Collections.singletonList(connName));
    }

    public Set<String> connectorNames() {
        return this.connectors.keySet();
    }

    public boolean isRunning(String connName) {
        WorkerConnector workerConnector = (WorkerConnector)this.connectors.get(connName);
        return workerConnector != null && workerConnector.isRunning();
    }

    public boolean startTask(ConnectorTaskId id, ClusterConfigState configState, Map<String, String> connProps, Map<String, String> taskProps, TaskStatus.Listener statusListener, TargetState initialState) {
        TaskStatus.Listener taskStatusListener = this.workerMetricsGroup.wrapStatusListener(statusListener);
        try (LoggingContext loggingContext = LoggingContext.forTask(id);){
            WorkerTask workerTask;
            log.info("Creating task {}", (Object)id);
            if (this.tasks.containsKey(id)) {
                throw new ConnectException("Task already exists in this worker: " + id);
            }
            this.connectorStatusMetricsGroup.recordTaskAdded(id);
            ClassLoader savedLoader = this.plugins.currentThreadLoader();
            HashMap<Object, String> closeableResources = new HashMap<Object, String>();
            try {
                String connType = connProps.get("connector.class");
                ClassLoader connectorLoader = this.plugins.delegatingLoader().connectorLoader(connType);
                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
                ConnectorConfig connConfig = new ConnectorConfig(this.plugins, connProps);
                TaskConfig taskConfig = new TaskConfig(taskProps);
                Class<Task> taskClass = taskConfig.getClass("task.class").asSubclass(Task.class);
                Task task = this.plugins.newTask(taskClass);
                log.info("Instantiated task {} with version {} of type {}", new Object[]{id, task.version(), taskClass.getName()});
                Converter keyConverter = this.plugins.newConverter(connConfig, "key.converter", Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
                Converter valueConverter = this.plugins.newConverter(connConfig, "value.converter", Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
                HeaderConverter headerConverter = this.plugins.newHeaderConverter(connConfig, "header.converter", Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
                closeableResources.put(headerConverter, "header converter for task " + id);
                if (keyConverter == null) {
                    keyConverter = this.plugins.newConverter(this.config, "key.converter", Plugins.ClassLoaderUsage.PLUGINS);
                    log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), (Object)id);
                } else {
                    log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), (Object)id);
                }
                if (valueConverter == null) {
                    valueConverter = this.plugins.newConverter(this.config, "value.converter", Plugins.ClassLoaderUsage.PLUGINS);
                    log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), (Object)id);
                } else {
                    log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), (Object)id);
                }
                if (headerConverter == null) {
                    headerConverter = this.plugins.newHeaderConverter(this.config, "header.converter", Plugins.ClassLoaderUsage.PLUGINS);
                    closeableResources.put(headerConverter, "header converter for task " + id);
                    log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), (Object)id);
                } else {
                    log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), (Object)id);
                }
                workerTask = this.buildWorkerTask(configState, connConfig, id, task, taskStatusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader, closeableResources::put);
                workerTask.initialize(taskConfig);
                Plugins.compareAndSwapLoaders(savedLoader);
            }
            catch (Throwable t) {
                log.error("Failed to start task {}", (Object)id, (Object)t);
                Plugins.compareAndSwapLoaders(savedLoader);
                this.connectorStatusMetricsGroup.recordTaskRemoved(id);
                taskStatusListener.onFailure(id, t);
                closeableResources.forEach(Utils::closeQuietly);
                boolean bl = false;
                if (loggingContext != null) {
                    if (var10_9 != null) {
                        try {
                            loggingContext.close();
                        }
                        catch (Throwable throwable) {
                            var10_9.addSuppressed(throwable);
                        }
                    } else {
                        loggingContext.close();
                    }
                }
                return bl;
            }
            WorkerTask existing = this.tasks.putIfAbsent(id, workerTask);
            if (existing != null) {
                Utils.closeQuietly(workerTask::doClose, (String)("duplicate instance of task " + id));
                throw new ConnectException("Task already exists in this worker: " + id);
            }
            this.executor.submit(workerTask);
            if (workerTask instanceof WorkerSourceTask) {
                this.sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask)workerTask);
            }
            boolean bl = true;
            return bl;
        }
    }

    WorkerTask buildWorkerTask(ClusterConfigState configState, ConnectorConfig connConfig, ConnectorTaskId id, Task task, TaskStatus.Listener statusListener, TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, ClassLoader loader, BiConsumer<AutoCloseable, String> closeableResources) {
        WorkerTask workerTask;
        Optional<Tracer> connectorTracer;
        ErrorHandlingMetrics errorHandlingMetrics = this.errorHandlingMetrics(id);
        closeableResources.accept(errorHandlingMetrics, "error handling metrics for task " + id);
        Class<? extends Connector> connectorClass = this.plugins.connectorClass(connConfig.getString("connector.class"));
        boolean failOnNonToleratedException = Boolean.valueOf(this.config.originals().getOrDefault(FAIL_NON_TOLERATED_EXCEPTION, "false"));
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(), connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM, failOnNonToleratedException);
        closeableResources.accept(retryWithToleranceOperator, "retry operator for task " + id);
        retryWithToleranceOperator.metrics(errorHandlingMetrics);
        if (task instanceof SourceTask) {
            TransformationChain<SourceRecord> transformationChain;
            Map<String, TopicCreationGroup> topicCreationGroups;
            TopicAdmin admin;
            SourceConnectorConfig sourceConfig = new SourceConnectorConfig(this.plugins, connConfig.originalsStrings(), this.config.topicCreationEnable());
            OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(this.offsetBackingStore, id.connector(), this.internalKeyConverter, this.internalValueConverter);
            closeableResources.accept(offsetReader, "offset reader for task " + id);
            OffsetStorageWriter offsetWriter = new OffsetStorageWriter(this.offsetBackingStore, id.connector(), this.internalKeyConverter, this.internalValueConverter);
            Map<String, Object> producerProps = Worker.producerConfigs(id, "connector-producer-" + id, this.config, sourceConfig, connectorClass, this.connectorClientConfigOverridePolicy, this.kafkaClusterId);
            KafkaProducer producer = new KafkaProducer(producerProps);
            closeableResources.accept((AutoCloseable)producer, "producer for task " + id);
            if (this.config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
                Map<String, Object> adminProps = Worker.adminConfigs(id, "connector-adminclient-" + id, this.config, sourceConfig, connectorClass, this.connectorClientConfigOverridePolicy, this.kafkaClusterId);
                admin = new TopicAdmin(adminProps);
                closeableResources.accept(admin, "topic admin for task" + id);
                topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
            } else {
                admin = null;
                topicCreationGroups = null;
            }
            connectorTracer = this.connectorTracer(connConfig, id, null, admin);
            connectorTracer.ifPresent(tracer -> closeableResources.accept((AutoCloseable)tracer, "tracer for task " + id));
            List<ErrorReporter> sourceTaskReporters = this.sourceTaskReporters(id, sourceConfig, errorHandlingMetrics);
            sourceTaskReporters.forEach(reporter -> closeableResources.accept((AutoCloseable)reporter, "reporter " + reporter + " for task " + id));
            if (connectorTracer.isPresent()) {
                transformationChain = new TracingTransformationChain(connectorTracer.get(), retryWithToleranceOperator);
                closeableResources.accept(transformationChain, "transformations for task " + id);
                TraceReporter traceReporter = new TraceReporter(connectorTracer.get());
                closeableResources.accept(traceReporter, "trace reporter for task " + id);
                sourceTaskReporters.add(traceReporter);
            } else {
                transformationChain = new TransformationChain<SourceRecord>(sourceConfig.transformations(), retryWithToleranceOperator);
                closeableResources.accept(transformationChain, "transformations for task " + id);
            }
            log.info("Initializing: {}", transformationChain);
            retryWithToleranceOperator.reporters(sourceTaskReporters);
            workerTask = new WorkerSourceTask(id, (SourceTask)task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, (KafkaProducer<byte[], byte[]>)producer, admin, topicCreationGroups, offsetReader, offsetWriter, this.config, configState, this.metrics, loader, this.time, retryWithToleranceOperator, this.herder.statusBackingStore(), this.executor);
        } else if (task instanceof SinkTask) {
            TransformationChain<SinkRecord> transformationChain;
            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(this.plugins, connConfig.originalsStrings());
            List<ErrorReporter> sinkTaskReporters = this.sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass);
            sinkTaskReporters.forEach(reporter -> closeableResources.accept((AutoCloseable)reporter, "reporter " + reporter + " for task " + id));
            connectorTracer = this.connectorTracer(connConfig, id, null, null);
            connectorTracer.ifPresent(tracer -> closeableResources.accept((AutoCloseable)tracer, "tracer for task " + id));
            if (connectorTracer.isPresent()) {
                transformationChain = new TracingTransformationChain(connectorTracer.get(), retryWithToleranceOperator);
                closeableResources.accept(transformationChain, "transformations for task " + id);
                TraceReporter traceReporter = new TraceReporter(connectorTracer.get());
                closeableResources.accept(traceReporter, "trace reporter for task " + id);
                sinkTaskReporters.add(traceReporter);
            } else {
                transformationChain = new TransformationChain<SinkRecord>(connConfig.transformations(), retryWithToleranceOperator);
                closeableResources.accept(transformationChain, "transformations for task " + id);
            }
            log.info("Initializing: {}", transformationChain);
            retryWithToleranceOperator.reporters(sinkTaskReporters);
            WorkerErrantRecordReporter workerErrantRecordReporter = this.createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, keyConverter, valueConverter, headerConverter, connectorTracer);
            Map<String, Object> consumerProps = Worker.consumerConfigs(id, this.config, connConfig, connectorClass, this.connectorClientConfigOverridePolicy, this.kafkaClusterId);
            KafkaConsumer consumer = new KafkaConsumer(consumerProps);
            closeableResources.accept((AutoCloseable)consumer, "consumer for task " + id);
            workerTask = new WorkerSinkTask(id, (SinkTask)task, statusListener, initialState, this.config, configState, this.metrics, keyConverter, valueConverter, headerConverter, transformationChain, (KafkaConsumer<byte[], byte[]>)consumer, loader, this.time, retryWithToleranceOperator, workerErrantRecordReporter, this.herder.statusBackingStore());
        } else {
            log.error("Tasks must be a subclass of either SourceTask or SinkTask and current is {}", (Object)task);
            throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
        }
        connectorTracer.ifPresent(t -> {
            log.info("Using tracing id {} for connector '{}' and task {}", new Object[]{t.tracingContext().traceID(), id.connector(), id.task()});
            workerTask.useTracer((Tracer)t);
        });
        return workerTask;
    }

    static Map<String, Object> producerConfigs(ConnectorTaskId id, String defaultClientId, WorkerConfig config, ConnectorConfig connConfig, Class<? extends Connector> connectorClass, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String clusterId) {
        HashMap<String, Object> producerProps = new HashMap<String, Object>();
        producerProps.put("bootstrap.servers", Utils.join((Collection)config.getList("bootstrap.servers"), (String)","));
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("max.block.ms", Long.toString(Long.MAX_VALUE));
        producerProps.put("enable.idempotence", "false");
        producerProps.put("acks", "all");
        producerProps.put("max.in.flight.requests.per.connection", "1");
        producerProps.put("delivery.timeout.ms", Integer.toString(Integer.MAX_VALUE));
        producerProps.put("client.id", defaultClientId);
        producerProps.putAll(config.originalsWithPrefix("producer."));
        producerProps.putAll(Worker.telemetryReporterConfig(config));
        ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId, id);
        ConnectUtils.addConfluentMetricsContextProperties(producerProps);
        Map<String, Object> producerOverrides = Worker.connectorClientConfigOverrides(id, connConfig, connectorClass, "producer.override.", ConnectorType.SOURCE, ConnectorClientConfigRequest.ClientType.PRODUCER, connectorClientConfigOverridePolicy);
        producerProps.putAll(producerOverrides);
        return producerProps;
    }

    static Map<String, Object> consumerConfigs(ConnectorTaskId id, WorkerConfig config, ConnectorConfig connConfig, Class<? extends Connector> connectorClass, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String clusterId) {
        HashMap<String, Object> consumerProps = new HashMap<String, Object>();
        consumerProps.put("group.id", SinkUtils.consumerGroupId(id.connector()));
        consumerProps.put("client.id", "connector-consumer-" + id);
        consumerProps.put("bootstrap.servers", Utils.join((Collection)config.getList("bootstrap.servers"), (String)","));
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProps.putAll(config.originalsWithPrefix("consumer."));
        consumerProps.putAll(Worker.telemetryReporterConfig(config));
        ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId, id);
        ConnectUtils.addConfluentMetricsContextProperties(consumerProps);
        Map<String, Object> consumerOverrides = Worker.connectorClientConfigOverrides(id, connConfig, connectorClass, "consumer.override.", ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.CONSUMER, connectorClientConfigOverridePolicy);
        consumerProps.putAll(consumerOverrides);
        return consumerProps;
    }

    static Map<String, Object> adminConfigs(ConnectorTaskId id, String defaultClientId, WorkerConfig config, ConnectorConfig connConfig, Class<? extends Connector> connectorClass, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, String clusterId) {
        HashMap<String, Object> adminProps = new HashMap<String, Object>();
        Map<String, Object> nonPrefixedWorkerConfigs = config.originals().entrySet().stream().filter(e -> !((String)e.getKey()).startsWith("admin.") && !((String)e.getKey()).startsWith("producer.") && !((String)e.getKey()).startsWith("consumer.")).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        adminProps.put("bootstrap.servers", Utils.join((Collection)config.getList("bootstrap.servers"), (String)","));
        adminProps.put("client.id", defaultClientId);
        adminProps.putAll(nonPrefixedWorkerConfigs);
        adminProps.putAll(config.originalsWithPrefix("admin."));
        adminProps.putAll(Worker.telemetryReporterConfig(config));
        Map<String, Object> adminOverrides = Worker.connectorClientConfigOverrides(id, connConfig, connectorClass, "admin.override.", ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.ADMIN, connectorClientConfigOverridePolicy);
        adminProps.putAll(adminOverrides);
        ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId, id);
        ConnectUtils.addConfluentMetricsContextProperties(adminProps);
        return adminProps;
    }

    private static Map<String, Object> telemetryReporterConfig(WorkerConfig config) {
        HashMap<String, Object> map = new HashMap<String, Object>();
        if (config.originals().containsKey("metric.reporters")) {
            map.put("metric.reporters", config.originals().get("metric.reporters"));
            map.putAll(config.originalsWithPrefix("confluent.telemetry", false));
        }
        return map;
    }

    private static Map<String, Object> connectorClientConfigOverrides(ConnectorTaskId id, ConnectorConfig connConfig, Class<? extends Connector> connectorClass, String clientConfigPrefix, ConnectorType connectorType, ConnectorClientConfigRequest.ClientType clientType, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        Map clientOverrides = connConfig.originalsWithPrefix(clientConfigPrefix);
        ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(id.connector(), connectorType, connectorClass, clientOverrides, clientType);
        List configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
        List errorConfigs = configValues.stream().filter(configValue -> configValue.errorMessages().size() > 0).collect(Collectors.toList());
        if (errorConfigs.size() > 0) {
            throw new ConnectException("Client Config Overrides not allowed " + errorConfigs);
        }
        return clientOverrides;
    }

    ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
        return new ErrorHandlingMetrics(id, this.metrics);
    }

    private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, SinkConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics, Class<? extends Connector> connectorClass) {
        ArrayList<ErrorReporter> reporters = new ArrayList<ErrorReporter>();
        try {
            LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics);
            reporters.add(logReporter);
            String topic = connConfig.dlqTopicName();
            if (topic != null && !topic.isEmpty()) {
                Map<String, Object> producerProps = Worker.producerConfigs(id, "connector-dlq-producer-" + id, this.config, connConfig, connectorClass, this.connectorClientConfigOverridePolicy, this.kafkaClusterId);
                Map<String, Object> adminProps = Worker.adminConfigs(id, "connector-dlq-adminclient-", this.config, connConfig, connectorClass, this.connectorClientConfigOverridePolicy, this.kafkaClusterId);
                DeadLetterQueueReporter reporter2 = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
                reporters.add(reporter2);
            }
            return reporters;
        }
        catch (Throwable t) {
            reporters.forEach(reporter -> Utils.closeQuietly((AutoCloseable)reporter, (String)("reporter " + reporter + " for task " + id)));
            throw t;
        }
    }

    private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics) {
        ArrayList<ErrorReporter> reporters = new ArrayList<ErrorReporter>();
        try {
            LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics);
            reporters.add(logReporter);
            return reporters;
        }
        catch (Throwable t) {
            reporters.forEach(reporter -> Utils.closeQuietly((AutoCloseable)reporter, (String)("reporter " + reporter + " for task " + id)));
            throw t;
        }
    }

    private WorkerErrantRecordReporter createWorkerErrantRecordReporter(SinkConnectorConfig connConfig, RetryWithToleranceOperator retryWithToleranceOperator, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, Optional<Tracer> tracer) {
        if (connConfig.enableErrantRecordReporter()) {
            if (tracer.isPresent()) {
                return new TraceWorkerErrantRecordReporter(retryWithToleranceOperator, keyConverter, valueConverter, headerConverter, tracer.get());
            }
            return new WorkerErrantRecordReporter(retryWithToleranceOperator, keyConverter, valueConverter, headerConverter);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopTask(ConnectorTaskId taskId) {
        try (LoggingContext loggingContext = LoggingContext.forTask(taskId);){
            WorkerTask task = (WorkerTask)this.tasks.get(taskId);
            if (task == null) {
                log.warn("Ignoring stop request for unowned task {}", (Object)taskId);
                return;
            }
            log.info("Stopping task {}", (Object)task.id());
            if (task instanceof WorkerSourceTask) {
                this.sourceTaskOffsetCommitter.remove(task.id());
            }
            ClassLoader savedLoader = this.plugins.currentThreadLoader();
            try {
                savedLoader = Plugins.compareAndSwapLoaders(task.loader());
                task.stop();
            }
            finally {
                Plugins.compareAndSwapLoaders(savedLoader);
            }
        }
    }

    private void stopTasks(Collection<ConnectorTaskId> ids) {
        for (ConnectorTaskId taskId : ids) {
            this.stopTask(taskId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void awaitStopTask(ConnectorTaskId taskId, long timeout) {
        try (LoggingContext loggingContext = LoggingContext.forTask(taskId);){
            WorkerTask task = (WorkerTask)this.tasks.remove(taskId);
            if (task == null) {
                log.warn("Ignoring await stop request for non-present task {}", (Object)taskId);
                return;
            }
            if (!task.awaitStop(timeout)) {
                log.error("Graceful stop of task {} failed.", (Object)task.id());
                task.cancel();
            } else {
                log.debug("Graceful stop of task {} succeeded.", (Object)task.id());
            }
            try {
                task.removeMetrics();
            }
            finally {
                this.connectorStatusMetricsGroup.recordTaskRemoved(taskId);
            }
        }
    }

    private void awaitStopTasks(Collection<ConnectorTaskId> ids) {
        long now = this.time.milliseconds();
        long deadline = now + this.config.getLong("task.shutdown.graceful.timeout.ms");
        for (ConnectorTaskId id : ids) {
            long remaining = Math.max(0L, deadline - this.time.milliseconds());
            this.awaitStopTask(id, remaining);
        }
    }

    public void stopAndAwaitTasks() {
        this.stopAndAwaitTasks(new ArrayList<ConnectorTaskId>(this.tasks.keySet()));
    }

    public void stopAndAwaitTasks(Collection<ConnectorTaskId> ids) {
        this.stopTasks(ids);
        this.awaitStopTasks(ids);
    }

    public void stopAndAwaitTask(ConnectorTaskId taskId) {
        this.stopTask(taskId);
        this.awaitStopTasks(Collections.singletonList(taskId));
    }

    public Set<ConnectorTaskId> taskIds() {
        return this.tasks.keySet();
    }

    public Converter getInternalKeyConverter() {
        return this.internalKeyConverter;
    }

    public Converter getInternalValueConverter() {
        return this.internalValueConverter;
    }

    public Plugins getPlugins() {
        return this.plugins;
    }

    public String workerId() {
        return this.workerId;
    }

    public boolean isTopicCreationEnabled() {
        return this.config.topicCreationEnable();
    }

    public ConnectMetrics metrics() {
        return this.metrics;
    }

    public void setTargetState(String connName, TargetState state, Callback<TargetState> stateChangeCallback) {
        log.info("Setting connector {} state to {}", (Object)connName, (Object)state);
        WorkerConnector workerConnector = (WorkerConnector)this.connectors.get(connName);
        if (workerConnector != null) {
            ClassLoader connectorLoader = this.plugins.delegatingLoader().connectorLoader(workerConnector.connector());
            this.executeStateTransition(() -> workerConnector.transitionTo(state, stateChangeCallback), connectorLoader);
        }
        for (Map.Entry taskEntry : this.tasks.entrySet()) {
            if (!((ConnectorTaskId)taskEntry.getKey()).connector().equals(connName)) continue;
            WorkerTask workerTask = (WorkerTask)taskEntry.getValue();
            this.executeStateTransition(() -> workerTask.transitionTo(state), workerTask.loader);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeStateTransition(Runnable stateTransition, ClassLoader loader) {
        ClassLoader savedLoader = this.plugins.currentThreadLoader();
        try {
            savedLoader = Plugins.compareAndSwapLoaders(loader);
            stateTransition.run();
        }
        finally {
            Plugins.compareAndSwapLoaders(savedLoader);
        }
    }

    ConnectorStatusMetricsGroup connectorStatusMetricsGroup() {
        return this.connectorStatusMetricsGroup;
    }

    WorkerMetricsGroup workerMetricsGroup() {
        return this.workerMetricsGroup;
    }

    @Confluent
    private Optional<Tracer> connectorTracer(ConnectorConfig connectorConfig, ConnectorTaskId connectorTaskId, KafkaProducer<byte[], byte[]> producer, TopicAdmin admin) {
        TracerConfig tracerConfig = new TracerConfig(this.plugins, connectorConfig);
        if (!tracerConfig.isTracingEnabled()) {
            return Optional.empty();
        }
        log.info("Setting up tracing for connector '{}' and task {}", (Object)connectorTaskId.connector(), (Object)connectorTaskId.task());
        Class<? extends Connector> connectorClass = this.plugins.connectorClass(connectorConfig.getString("connector.class"));
        if (producer == null) {
            Map<String, Object> producerConfigs = Worker.producerConfigs(connectorTaskId, "Connector-Tracer-Producer-" + connectorTaskId, this.config, connectorConfig, connectorClass, this.connectorClientConfigOverridePolicy, this.kafkaClusterId);
            ProducerConfig tempConfig = new ProducerConfig(producerConfigs);
            int minDeliveryTimeout = tempConfig.getLong("linger.ms").intValue() + tempConfig.getInt("request.timeout.ms");
            producerConfigs.put("delivery.timeout.ms", 2 * minDeliveryTimeout);
            log.info("Creating new producer for sending trace records");
            producer = new KafkaProducer(producerConfigs);
        }
        if (admin == null) {
            log.info("Creating new admin client for creating trace topic(s)");
            admin = new TopicAdmin(Worker.adminConfigs(connectorTaskId, "Connector-Tracer-Admin-" + connectorTaskId, this.config, connectorConfig, connectorClass, this.connectorClientConfigOverridePolicy, this.kafkaClusterId));
        }
        return Optional.of(new ConnectTracer(connectorTaskId, tracerConfig, (Producer<byte[], byte[]>)producer, admin));
    }

    static class ConnectorStatusMetricsGroup {
        private final ConnectMetrics connectMetrics;
        private final ConnectMetricsRegistry registry;
        private final ConcurrentMap<String, ConnectMetrics.MetricGroup> connectorStatusMetrics = new ConcurrentHashMap<String, ConnectMetrics.MetricGroup>();
        private final Herder herder;
        private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks;

        protected ConnectorStatusMetricsGroup(ConnectMetrics connectMetrics, ConcurrentMap<ConnectorTaskId, WorkerTask> tasks, Herder herder) {
            this.connectMetrics = connectMetrics;
            this.registry = connectMetrics.registry();
            this.tasks = tasks;
            this.herder = herder;
        }

        protected ConnectMetrics.LiteralSupplier<Long> taskCounter(String connName) {
            return now -> this.tasks.keySet().stream().filter(taskId -> taskId.connector().equals(connName)).count();
        }

        protected ConnectMetrics.LiteralSupplier<Long> taskStatusCounter(String connName, AbstractStatus.State state) {
            return now -> this.tasks.values().stream().filter(task -> task.id().connector().equals(connName) && this.herder.taskStatus(task.id()).state().equalsIgnoreCase(state.toString())).count();
        }

        protected synchronized void recordTaskAdded(ConnectorTaskId connectorTaskId) {
            if (this.connectorStatusMetrics.containsKey(connectorTaskId.connector())) {
                return;
            }
            String connName = connectorTaskId.connector();
            ConnectMetrics.MetricGroup metricGroup = this.connectMetrics.group(this.registry.workerGroupName(), this.registry.connectorTagName(), connName);
            metricGroup.addValueMetric(this.registry.connectorTotalTaskCount, this.taskCounter(connName));
            for (Map.Entry<MetricNameTemplate, AbstractStatus.State> statusMetric : this.registry.connectorStatusMetrics.entrySet()) {
                metricGroup.addValueMetric(statusMetric.getKey(), this.taskStatusCounter(connName, statusMetric.getValue()));
            }
            this.connectorStatusMetrics.put(connectorTaskId.connector(), metricGroup);
        }

        protected synchronized void recordTaskRemoved(ConnectorTaskId connectorTaskId) {
            if (this.tasks.keySet().stream().noneMatch(id -> id.connector().equals(connectorTaskId.connector()))) {
                ((ConnectMetrics.MetricGroup)this.connectorStatusMetrics.get(connectorTaskId.connector())).close();
                this.connectorStatusMetrics.remove(connectorTaskId.connector());
            }
        }

        protected synchronized void close() {
            for (ConnectMetrics.MetricGroup metricGroup : this.connectorStatusMetrics.values()) {
                metricGroup.close();
            }
        }

        protected ConnectMetrics.MetricGroup metricGroup(String connectorId) {
            return (ConnectMetrics.MetricGroup)this.connectorStatusMetrics.get(connectorId);
        }
    }
}

