/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.SubmittedRecords;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskPluginsMetadata;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TransformationChain;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerSourceTaskContext;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.runtime.WorkerTransactionContext;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.tracing.Tracer;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreation;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWorkerSourceTask
extends WorkerTask<SourceRecord, SourceRecord> {
    private static final Logger log = LoggerFactory.getLogger(AbstractWorkerSourceTask.class);
    private static final long SEND_FAILED_BACKOFF_MS = 100L;
    protected static final long TOPIC_VERIFICATION_THRESHOLD = Duration.ofHours(1L).toMillis();
    protected final WorkerConfig workerConfig;
    protected final ConnectorOffsetBackingStore offsetStore;
    protected final OffsetStorageWriter offsetWriter;
    protected final Producer<byte[], byte[]> producer;
    private final SourceTask task;
    private final Plugin<Converter> keyConverterPlugin;
    private final Plugin<Converter> valueConverterPlugin;
    private final Plugin<HeaderConverter> headerConverterPlugin;
    private final TopicAdmin admin;
    private final CloseableOffsetStorageReader offsetReader;
    private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
    private final CountDownLatch stopRequestedLatch;
    private final boolean topicTrackingEnabled;
    private final TopicCreation topicCreation;
    private final Executor closeExecutor;
    private final String version;
    List<SourceRecord> toSend;
    protected Map<String, String> taskConfig;
    protected WorkerSourceTaskContext sourceTaskContext;
    protected boolean started = false;
    private volatile boolean producerClosed = false;
    final Map<String, Long> topicLastVerifiedAt;
    final Map<String, Long> topicLastRecordSentAt;

    protected abstract void prepareToInitializeTask();

    protected abstract void prepareToEnterSendLoop();

    protected abstract void beginSendIteration();

    protected abstract void prepareToPollTask();

    protected abstract void recordDropped(SourceRecord var1);

    protected abstract Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(SourceRecord var1, ProducerRecord<byte[], byte[]> var2);

    protected abstract void recordDispatched(SourceRecord var1);

    protected abstract void batchDispatched();

    protected abstract void recordSent(SourceRecord var1, ProducerRecord<byte[], byte[]> var2, RecordMetadata var3);

    protected abstract void producerSendFailed(ProcessingContext<SourceRecord> var1, boolean var2, ProducerRecord<byte[], byte[]> var3, SourceRecord var4, Exception var5);

    protected abstract void finalOffsetCommit(boolean var1);

    protected AbstractWorkerSourceTask(ConnectorTaskId id, SourceTask task, TaskStatus.Listener statusListener, TargetState initialState, ClusterConfigState configState, Plugin<Converter> keyConverterPlugin, Plugin<Converter> valueConverterPlugin, Plugin<HeaderConverter> headerConverterPlugin, TransformationChain<SourceRecord, SourceRecord> transformationChain, WorkerTransactionContext workerTransactionContext, Producer<byte[], byte[]> producer, TopicAdmin admin, Map<String, TopicCreationGroup> topicGroups, CloseableOffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, ConnectorOffsetBackingStore offsetStore, WorkerConfig workerConfig, ConnectMetrics connectMetrics, ErrorHandlingMetrics errorMetrics, ClassLoader loader, Time time, RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator, StatusBackingStore statusBackingStore, Executor closeExecutor, Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier, TaskPluginsMetadata pluginsMetadata, Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
        super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, retryWithToleranceOperator, transformationChain, errorReportersSupplier, time, statusBackingStore, pluginsMetadata, pluginLoaderSwapper);
        this.workerConfig = workerConfig;
        this.task = task;
        this.keyConverterPlugin = keyConverterPlugin;
        this.valueConverterPlugin = valueConverterPlugin;
        this.headerConverterPlugin = headerConverterPlugin;
        this.producer = producer;
        this.admin = admin;
        this.offsetReader = offsetReader;
        this.offsetWriter = offsetWriter;
        this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks");
        this.closeExecutor = closeExecutor;
        this.sourceTaskContext = new WorkerSourceTaskContext(offsetReader, id, configState, workerTransactionContext, (PluginMetrics)this.pluginMetrics);
        this.stopRequestedLatch = new CountDownLatch(1);
        this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
        this.topicTrackingEnabled = workerConfig.getBoolean("topic.tracking.enable");
        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups);
        this.version = task.version();
        this.topicLastVerifiedAt = new HashMap<String, Long>();
        this.topicLastRecordSentAt = new HashMap<String, Long>();
    }

    @Override
    public void initialize(TaskConfig taskConfig) {
        try {
            this.taskConfig = taskConfig.originalsStrings();
        }
        catch (Throwable t) {
            log.error("{} Task failed initialization and will not be started.", (Object)this, (Object)t);
            this.onFailure(t);
        }
    }

    @Override
    protected void initializeAndStart() {
        this.prepareToInitializeTask();
        this.offsetStore.start();
        this.started = true;
        this.task.initialize((SourceTaskContext)this.sourceTaskContext);
        this.task.start(this.taskConfig);
        this.taskThreadId = Thread.currentThread().getId();
        log.info("{} Source task finished initialization and start, task thread id is {}", (Object)this, (Object)this.taskThreadId);
        if (this.isTaskConnectorMetricsEnabled(this.workerConfig)) {
            this.scheduler.scheduleWithFixedDelay(this::recordTaskMetrics, 0L, 1L, TimeUnit.MINUTES);
        }
    }

    @Override
    public void cancel() {
        super.cancel();
        this.offsetReader.close();
        this.closeExecutor.execute(() -> this.closeProducer(Duration.ZERO));
    }

    @Override
    public void stop() {
        super.stop();
        this.stopRequestedLatch.countDown();
    }

    @Override
    public void removeMetrics() {
        Utils.closeQuietly((AutoCloseable)this.sourceTaskMetricsGroup, (String)"source task metrics tracker");
        super.removeMetrics();
    }

    @Override
    protected void close() {
        if (this.started) {
            Utils.closeQuietly(() -> ((SourceTask)this.task).stop(), (String)"source task");
        }
        this.closeProducer(Duration.ofSeconds(30L));
        if (this.admin != null) {
            Utils.closeQuietly(() -> this.admin.close(Duration.ofSeconds(30L)), (String)"source task admin");
        }
        Utils.closeQuietly((AutoCloseable)this.offsetReader, (String)"offset reader");
        Utils.closeQuietly(this.offsetStore::stop, (String)"offset backing store");
        Utils.closeQuietly(this.headerConverterPlugin, (String)"header converter");
        Utils.closeQuietly(this.keyConverterPlugin, (String)"key converter");
        Utils.closeQuietly(this.valueConverterPlugin, (String)"value converter");
        Utils.closeQuietly((AutoCloseable)this.pluginMetrics, (String)"pluginMetrics");
    }

    private void closeProducer(Duration duration) {
        if (this.producer != null) {
            this.producerClosed = true;
            Utils.closeQuietly(() -> this.producer.close(duration), (String)"source task producer");
        }
    }

    @Override
    public void execute() {
        try {
            this.prepareToEnterSendLoop();
            while (!this.isStopping()) {
                this.beginSendIteration();
                if (this.shouldPause()) {
                    this.onPause();
                    if (!this.awaitUnpause()) continue;
                    this.onResume();
                    this.prepareToEnterSendLoop();
                    continue;
                }
                if (this.toSend == null) {
                    this.prepareToPollTask();
                    log.trace("{} Nothing to send to Kafka. Polling source for additional records", (Object)this);
                    long start = this.time.milliseconds();
                    this.toSend = this.poll();
                    if (this.toSend != null) {
                        this.recordPollReturned(this.toSend.size(), this.time.milliseconds() - start);
                    }
                }
                if (this.toSend == null) {
                    this.batchDispatched();
                    continue;
                }
                log.trace("{} About to send {} records to Kafka", (Object)this, (Object)this.toSend.size());
                if (this.sendRecords()) continue;
                this.stopRequestedLatch.await(100L, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException start) {
        }
        catch (RuntimeException e) {
            if (this.isCancelled()) {
                log.debug("Skipping final offset commit as task has been cancelled");
                throw e;
            }
            try {
                this.finalOffsetCommit(true);
            }
            catch (Exception offsetException) {
                log.error("Failed to commit offsets for already-failing task", (Throwable)offsetException);
            }
            throw e;
        }
        this.finalOffsetCommit(false);
    }

    @Override
    public String taskVersion() {
        return this.version;
    }

    boolean sendRecords() {
        int processed = 0;
        this.recordBatch(this.toSend.size());
        SourceRecordWriteCounter counter = this.toSend.isEmpty() ? null : new SourceRecordWriteCounter(this.toSend.size(), this.sourceTaskMetricsGroup);
        for (SourceRecord preTransformRecord : this.toSend) {
            ProcessingContext<SourceRecord> context = new ProcessingContext<SourceRecord>(preTransformRecord);
            SourceRecord record = this.transformationChain.apply(context, preTransformRecord);
            ProducerRecord<byte[], byte[]> producerRecord = this.convertTransformedRecord(context, record);
            List traceRecords = this.tracer.map(Tracer::buildRecords).orElse(Collections.emptyList());
            if (producerRecord == null || context.failed()) {
                counter.skipRecord();
                this.tracer.ifPresent(t -> {
                    t.writeTraceRecords(traceRecords, null);
                    t.tracingContext().incrementProcessedRecordCount();
                });
                this.recordDropped(preTransformRecord);
                ++processed;
                continue;
            }
            log.trace("{} Appending record to the topic {} with key {}, value {}", new Object[]{this, record.topic(), record.key(), record.value()});
            Optional<SubmittedRecords.SubmittedRecord> submittedRecord = this.prepareToSendRecord(preTransformRecord, producerRecord);
            try {
                String topic = producerRecord.topic();
                this.maybeCreateTopic(topic);
                this.producer.send(producerRecord, (recordMetadata, e) -> {
                    if (e != null) {
                        if (this.producerClosed) {
                            log.trace("{} failed to send record to {}; this is expected as the producer has already been closed", new Object[]{this, topic, e});
                        } else {
                            log.error("{} failed to send record to {}: ", new Object[]{this, topic, e});
                        }
                        log.trace("{} Failed record: {}", (Object)this, (Object)preTransformRecord);
                        this.producerSendFailed(context, false, producerRecord, preTransformRecord, e);
                        if (this.retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
                            counter.skipRecord();
                            submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::ack);
                        }
                    } else {
                        submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::ack);
                        counter.completeRecord();
                        log.trace("{} Wrote record successfully: topic {} partition {} offset {}", new Object[]{this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()});
                        this.recordSent(preTransformRecord, producerRecord, recordMetadata);
                        if (this.topicTrackingEnabled) {
                            this.recordActiveTopic(producerRecord.topic());
                        }
                        if (this.time.milliseconds() > this.topicLastRecordSentAt.getOrDefault(topic, 0L)) {
                            this.topicLastRecordSentAt.put(topic, this.time.milliseconds());
                        }
                        this.tracer.ifPresent(t -> t.writeTraceRecords(traceRecords, null));
                    }
                });
            }
            catch (org.apache.kafka.common.errors.RetriableException | RetriableException e2) {
                log.warn("{} Failed to send record to topic '{}' and partition '{}'. Backing off before retrying: ", new Object[]{this, producerRecord.topic(), producerRecord.partition(), e2});
                this.toSend = this.toSend.subList(processed, this.toSend.size());
                submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::drop);
                counter.retryRemaining();
                return false;
            }
            catch (ConnectException e3) {
                log.warn("{} Failed to send record to topic '{}' and partition '{}' due to an unrecoverable exception: ", new Object[]{this, producerRecord.topic(), producerRecord.partition(), e3});
                log.trace("{} Failed to send {} with unrecoverable exception: ", new Object[]{this, producerRecord, e3});
                throw e3;
            }
            catch (KafkaException e4) {
                this.producerSendFailed(context, true, producerRecord, preTransformRecord, (Exception)((Object)e4));
            }
            ++processed;
            this.recordDispatched(preTransformRecord);
            this.tracer.ifPresent(t -> t.tracingContext().incrementProcessedRecordCount());
        }
        this.toSend = null;
        this.batchDispatched();
        return true;
    }

    protected List<SourceRecord> poll() throws InterruptedException {
        try {
            return this.task.poll();
        }
        catch (org.apache.kafka.common.errors.RetriableException | RetriableException e) {
            log.warn("{} failed to poll records from SourceTask. Will retry operation.", (Object)this, (Object)e);
            return null;
        }
    }

    protected ProducerRecord<byte[], byte[]> convertTransformedRecord(ProcessingContext<SourceRecord> context, SourceRecord record) {
        if (record == null) {
            return null;
        }
        RecordHeaders headers = (RecordHeaders)this.retryWithToleranceOperator.execute(context, () -> this.convertHeaderFor(record), Stage.HEADER_CONVERTER, ((HeaderConverter)this.headerConverterPlugin.get()).getClass());
        byte[] key = (byte[])this.retryWithToleranceOperator.execute(context, () -> {
            try (LoaderSwap swap = (LoaderSwap)this.pluginLoaderSwapper.apply(((Converter)this.keyConverterPlugin.get()).getClass().getClassLoader());){
                byte[] byArray = ((Converter)this.keyConverterPlugin.get()).fromConnectData(record.topic(), (Headers)headers, record.keySchema(), record.key());
                return byArray;
            }
        }, Stage.KEY_CONVERTER, ((Converter)this.keyConverterPlugin.get()).getClass());
        byte[] value = (byte[])this.retryWithToleranceOperator.execute(context, () -> {
            try (LoaderSwap swap = (LoaderSwap)this.pluginLoaderSwapper.apply(((Converter)this.valueConverterPlugin.get()).getClass().getClassLoader());){
                byte[] byArray = ((Converter)this.valueConverterPlugin.get()).fromConnectData(record.topic(), (Headers)headers, record.valueSchema(), record.value());
                return byArray;
            }
        }, Stage.VALUE_CONVERTER, ((Converter)this.valueConverterPlugin.get()).getClass());
        if (context.failed()) {
            return null;
        }
        return new ProducerRecord(record.topic(), record.kafkaPartition(), ConnectUtils.checkAndConvertTimestamp(record.timestamp()), (Object)key, (Object)value, (Iterable)headers);
    }

    private boolean topicVerifiedRecently(String topic) {
        return this.topicLastVerifiedAt.containsKey(topic) && this.time.milliseconds() - this.topicLastVerifiedAt.get(topic) < TOPIC_VERIFICATION_THRESHOLD || this.topicLastRecordSentAt.containsKey(topic) && this.time.milliseconds() - this.topicLastRecordSentAt.get(topic) < TOPIC_VERIFICATION_THRESHOLD;
    }

    private void maybeCreateTopic(String topic) {
        if (!this.topicCreation.isTopicCreationEnabled() || this.topicCreation.topicCreated(topic) && this.topicVerifiedRecently(topic)) {
            log.trace("Topic creation by the connector is disabled or the topic {} was previously created and it's existence was recently verified.If auto.create.topics.enable is enabled on the broker, the topic will be created with default settings", (Object)topic);
            return;
        }
        log.info("The task will send records to topic '{}' for the first time or we haven't verified or sent a record to the topic for {} ms. Checking whether topic exists to re-create it if it was manually deleted.", (Object)TOPIC_VERIFICATION_THRESHOLD, (Object)topic);
        this.topicLastVerifiedAt.put(topic, this.time.milliseconds());
        Map<String, TopicDescription> existing = this.admin.describeTopics(topic);
        if (!existing.isEmpty()) {
            log.info("Topic '{}' already exists.", (Object)topic);
            this.topicCreation.addTopic(topic);
            return;
        }
        log.info("Creating topic '{}'", (Object)topic);
        TopicCreationGroup topicGroup = this.topicCreation.findFirstGroup(topic);
        log.debug("Topic '{}' matched topic creation group: {}", (Object)topic, (Object)topicGroup);
        NewTopic newTopic = topicGroup.newTopic(topic);
        TopicAdmin.TopicCreationResponse response = this.admin.createOrFindTopics(newTopic);
        if (response.isCreated(newTopic.name())) {
            this.topicCreation.addTopic(topic);
            log.info("Created topic '{}' using creation group {}", (Object)newTopic, (Object)topicGroup);
        } else if (response.isExisting(newTopic.name())) {
            this.topicCreation.addTopic(topic);
            log.info("Found existing topic '{}'", (Object)newTopic);
        } else {
            log.warn("Request to create new topic '{}' failed", (Object)topic);
            throw new ConnectException("Task failed to create new topic " + String.valueOf(newTopic) + ". Ensure that the task is authorized to create topics or that the topic exists and restart the task");
        }
    }

    protected RecordHeaders convertHeaderFor(SourceRecord record) {
        org.apache.kafka.connect.header.Headers headers = record.headers();
        RecordHeaders result = new RecordHeaders();
        if (headers != null) {
            String topic = record.topic();
            for (Header header : headers) {
                String key = header.key();
                try (LoaderSwap swap = (LoaderSwap)this.pluginLoaderSwapper.apply(((HeaderConverter)this.headerConverterPlugin.get()).getClass().getClassLoader());){
                    byte[] rawHeader = ((HeaderConverter)this.headerConverterPlugin.get()).fromConnectHeader(topic, key, header.schema(), header.value());
                    result.add(key, rawHeader);
                }
            }
        }
        return result;
    }

    protected void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
        try {
            this.task.commitRecord(record, metadata);
        }
        catch (Throwable t) {
            log.error("{} Exception thrown while calling task.commitRecord()", (Object)this, (Object)t);
        }
    }

    protected void commitSourceTask() {
        try {
            this.task.commit();
        }
        catch (Throwable t) {
            log.error("{} Exception thrown while calling task.commit()", (Object)this, (Object)t);
        }
    }

    protected void recordPollReturned(int numRecordsInBatch, long duration) {
        this.sourceTaskMetricsGroup.recordPoll(numRecordsInBatch, duration);
    }

    SourceTaskMetricsGroup sourceTaskMetricsGroup() {
        return this.sourceTaskMetricsGroup;
    }

    static class SourceTaskMetricsGroup
    implements AutoCloseable {
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor sourceRecordPoll;
        private final Sensor sourceRecordWrite;
        private final Sensor sourceRecordActiveCount;
        private final Sensor pollTime;
        private int activeRecordCount;

        public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.sourceTaskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task()));
            this.metricGroup.close();
            this.sourceRecordPoll = this.metricGroup.sensor("source-record-poll");
            this.sourceRecordPoll.add(this.metricGroup.metricName(registry.sourceRecordPollRate), (MeasurableStat)new Rate());
            this.sourceRecordPoll.add(this.metricGroup.metricName(registry.sourceRecordPollTotal), (MeasurableStat)new CumulativeSum());
            this.sourceRecordWrite = this.metricGroup.sensor("source-record-write");
            this.sourceRecordWrite.add(this.metricGroup.metricName(registry.sourceRecordWriteRate), (MeasurableStat)new Rate());
            this.sourceRecordWrite.add(this.metricGroup.metricName(registry.sourceRecordWriteTotal), (MeasurableStat)new CumulativeSum());
            this.pollTime = this.metricGroup.sensor("poll-batch-time");
            this.pollTime.add(this.metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), (MeasurableStat)new Max());
            this.pollTime.add(this.metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), (MeasurableStat)new Avg());
            this.sourceRecordActiveCount = this.metricGroup.sensor("source-record-active-count");
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCount), (MeasurableStat)new Value());
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCountMax), (MeasurableStat)new Max());
            this.sourceRecordActiveCount.add(this.metricGroup.metricName(registry.sourceRecordActiveCountAvg), (MeasurableStat)new Avg());
        }

        @Override
        public void close() {
            this.metricGroup.close();
        }

        void recordPoll(int batchSize, long duration) {
            this.sourceRecordPoll.record((double)batchSize);
            this.pollTime.record((double)duration);
            this.activeRecordCount += batchSize;
            this.sourceRecordActiveCount.record((double)this.activeRecordCount);
        }

        void recordWrite(int recordCount, int skippedCount) {
            this.sourceRecordWrite.record((double)(recordCount - skippedCount));
            this.activeRecordCount -= recordCount;
            this.activeRecordCount = Math.max(0, this.activeRecordCount);
            this.sourceRecordActiveCount.record((double)this.activeRecordCount);
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    static class SourceRecordWriteCounter {
        private final SourceTaskMetricsGroup metricsGroup;
        private final int batchSize;
        private boolean completed = false;
        private int counter;
        private int skipped;

        public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) {
            assert (batchSize > 0);
            assert (metricsGroup != null);
            this.batchSize = batchSize;
            this.counter = batchSize;
            this.metricsGroup = metricsGroup;
        }

        public void skipRecord() {
            ++this.skipped;
            if (this.counter > 0 && --this.counter == 0) {
                this.finishedAllWrites();
            }
        }

        public void completeRecord() {
            if (this.counter > 0 && --this.counter == 0) {
                this.finishedAllWrites();
            }
        }

        public void retryRemaining() {
            this.finishedAllWrites();
        }

        private void finishedAllWrites() {
            if (!this.completed) {
                this.metricsGroup.recordWrite(this.batchSize - this.counter, this.skipped);
                this.completed = true;
            }
        }
    }
}

