package org.apache.kafka.connect.runtime;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTask.class */
public class WorkerSinkTask extends WorkerTask {
    private static final Logger log;
    private final WorkerConfig workerConfig;
    private final SinkTask task;
    private final ClusterConfigState configState;
    private Map<String, String> taskConfig;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    private final TransformationChain<SinkRecord> transformationChain;
    private final SinkTaskMetricsGroup sinkTaskMetricsGroup;
    private final boolean isTopicTrackingEnabled;
    private KafkaConsumer<byte[], byte[]> consumer;
    private WorkerSinkTaskContext context;
    private final List<SinkRecord> messageBatch;
    private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
    private final Map<TopicPartition, OffsetAndMetadata> origOffsets;
    private RuntimeException rebalanceException;
    private long nextCommit;
    private int commitSeqno;
    private long commitStarted;
    private int commitFailures;
    private boolean pausedForRedelivery;
    private boolean committing;
    private boolean taskStopped;
    private final WorkerErrantRecordReporter workerErrantRecordReporter;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTask$HandleRebalance.class */
    private class HandleRebalance implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            WorkerSinkTask.log.debug("{} Partitions assigned {}", WorkerSinkTask.this, collection);
            WorkerSinkTask.this.lastCommittedOffsets = new HashMap();
            WorkerSinkTask.this.currentOffsets = new HashMap();
            for (TopicPartition topicPartition : collection) {
                long position = WorkerSinkTask.this.consumer.position(topicPartition);
                WorkerSinkTask.this.lastCommittedOffsets.put(topicPartition, new OffsetAndMetadata(position));
                WorkerSinkTask.this.currentOffsets.put(topicPartition, new OffsetAndMetadata(position));
                WorkerSinkTask.log.debug("{} Assigned topic partition {} with offset {}", new Object[]{WorkerSinkTask.this, topicPartition, Long.valueOf(position)});
            }
            WorkerSinkTask.this.sinkTaskMetricsGroup.assignedOffsets(WorkerSinkTask.this.currentOffsets);
            WorkerSinkTask.this.pausedForRedelivery = false;
            WorkerSinkTask.this.context.pausedPartitions().retainAll(collection);
            if (WorkerSinkTask.this.shouldPause()) {
                WorkerSinkTask.this.pauseAll();
            } else if (!WorkerSinkTask.this.context.pausedPartitions().isEmpty()) {
                WorkerSinkTask.this.consumer.pause(WorkerSinkTask.this.context.pausedPartitions());
            }
            if (WorkerSinkTask.this.rebalanceException == null || (WorkerSinkTask.this.rebalanceException instanceof WakeupException)) {
                try {
                    WorkerSinkTask.this.openPartitions(collection);
                    WorkerSinkTask.this.rewind();
                } catch (RuntimeException e) {
                    WorkerSinkTask.this.rebalanceException = e;
                }
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            if (WorkerSinkTask.this.taskStopped) {
                WorkerSinkTask.log.trace("Skipping partition revocation callback as task has already been stopped");
                return;
            }
            WorkerSinkTask.log.debug("{} Partitions revoked", WorkerSinkTask.this);
            try {
                WorkerSinkTask.this.closePartitions();
                WorkerSinkTask.this.sinkTaskMetricsGroup.clearOffsets();
            } catch (RuntimeException e) {
                WorkerSinkTask.this.rebalanceException = e;
            }
            WorkerSinkTask.this.messageBatch.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTask$SinkTaskMetricsGroup.class */
    public static class SinkTaskMetricsGroup {
        private final ConnectorTaskId id;
        private final ConnectMetrics metrics;
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor sinkRecordRead;
        private final Sensor sinkRecordSend;
        private final Sensor partitionCount;
        private final Sensor offsetSeqNum;
        private final Sensor offsetCompletion;
        private final Sensor offsetCompletionSkip;
        private final Sensor putBatchTime;
        private final Sensor sinkRecordActiveCount;
        private long activeRecords;
        private Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap();
        private Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap();

        public SinkTaskMetricsGroup(ConnectorTaskId connectorTaskId, ConnectMetrics connectMetrics) {
            this.metrics = connectMetrics;
            this.id = connectorTaskId;
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.sinkTaskGroupName(), registry.connectorTagName(), connectorTaskId.connector(), registry.taskTagName(), Integer.toString(connectorTaskId.task()));
            this.metricGroup.close();
            this.sinkRecordRead = this.metricGroup.sensor("sink-record-read");
            this.sinkRecordRead.add(this.metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
            this.sinkRecordRead.add(this.metricGroup.metricName(registry.sinkRecordReadTotal), new CumulativeSum());
            this.sinkRecordSend = this.metricGroup.sensor("sink-record-send");
            this.sinkRecordSend.add(this.metricGroup.metricName(registry.sinkRecordSendRate), new Rate());
            this.sinkRecordSend.add(this.metricGroup.metricName(registry.sinkRecordSendTotal), new CumulativeSum());
            this.sinkRecordActiveCount = this.metricGroup.sensor("sink-record-active-count");
            this.sinkRecordActiveCount.add(this.metricGroup.metricName(registry.sinkRecordActiveCount), new Value());
            this.sinkRecordActiveCount.add(this.metricGroup.metricName(registry.sinkRecordActiveCountMax), new Max());
            this.sinkRecordActiveCount.add(this.metricGroup.metricName(registry.sinkRecordActiveCountAvg), new Avg());
            this.partitionCount = this.metricGroup.sensor("partition-count");
            this.partitionCount.add(this.metricGroup.metricName(registry.sinkRecordPartitionCount), new Value());
            this.offsetSeqNum = this.metricGroup.sensor("offset-seq-number");
            this.offsetSeqNum.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitSeqNum), new Value());
            this.offsetCompletion = this.metricGroup.sensor("offset-commit-completion");
            this.offsetCompletion.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate), new Rate());
            this.offsetCompletion.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), new CumulativeSum());
            this.offsetCompletionSkip = this.metricGroup.sensor("offset-commit-completion-skip");
            this.offsetCompletionSkip.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate), new Rate());
            this.offsetCompletionSkip.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), new CumulativeSum());
            this.putBatchTime = this.metricGroup.sensor("put-batch-time");
            this.putBatchTime.add(this.metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), new Max());
            this.putBatchTime.add(this.metricGroup.metricName(registry.sinkRecordPutBatchTimeAvg), new Avg());
        }

        void computeSinkRecordLag() {
            Map<TopicPartition, OffsetAndMetadata> map = this.consumedOffsets;
            Map<TopicPartition, OffsetAndMetadata> map2 = this.committedOffsets;
            this.activeRecords = 0L;
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map2.entrySet()) {
                OffsetAndMetadata offsetAndMetadata = map.get(entry.getKey());
                if (offsetAndMetadata != null) {
                    this.activeRecords += Math.max(offsetAndMetadata.offset() - entry.getValue().offset(), 0L);
                }
            }
            this.sinkRecordActiveCount.record(this.activeRecords);
        }

        void close() {
            this.metricGroup.close();
        }

        void recordRead(int i) {
            this.sinkRecordRead.record(i);
        }

        void recordSend(int i) {
            this.sinkRecordSend.record(i);
        }

        void recordPut(long j) {
            this.putBatchTime.record(j);
        }

        void recordPartitionCount(int i) {
            this.partitionCount.record(i);
        }

        void recordOffsetSequenceNumber(int i) {
            this.offsetSeqNum.record(i);
        }

        void recordConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
            this.consumedOffsets.putAll(map);
            computeSinkRecordLag();
        }

        void recordCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
            this.committedOffsets = map;
            computeSinkRecordLag();
        }

        void assignedOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
            this.consumedOffsets = new HashMap(map);
            this.committedOffsets = map;
            this.sinkRecordActiveCount.record(0.0d);
        }

        void clearOffsets() {
            this.consumedOffsets.clear();
            this.committedOffsets.clear();
            this.sinkRecordActiveCount.record(0.0d);
        }

        void recordOffsetCommitSuccess() {
            this.offsetCompletion.record(1.0d);
        }

        void recordOffsetCommitSkip() {
            this.offsetCompletionSkip.record(1.0d);
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    public WorkerSinkTask(ConnectorTaskId connectorTaskId, SinkTask sinkTask, TaskStatus.Listener listener, TargetState targetState, WorkerConfig workerConfig, ClusterConfigState clusterConfigState, ConnectMetrics connectMetrics, Converter converter, Converter converter2, HeaderConverter headerConverter, TransformationChain<SinkRecord> transformationChain, KafkaConsumer<byte[], byte[]> kafkaConsumer, ClassLoader classLoader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, WorkerErrantRecordReporter workerErrantRecordReporter, StatusBackingStore statusBackingStore) {
        super(connectorTaskId, listener, targetState, classLoader, connectMetrics, retryWithToleranceOperator, time, statusBackingStore);
        this.workerConfig = workerConfig;
        this.task = sinkTask;
        this.configState = clusterConfigState;
        this.keyConverter = converter;
        this.valueConverter = converter2;
        this.headerConverter = headerConverter;
        this.transformationChain = transformationChain;
        this.messageBatch = new ArrayList();
        this.currentOffsets = new HashMap();
        this.origOffsets = new HashMap();
        this.pausedForRedelivery = false;
        this.rebalanceException = null;
        this.nextCommit = time.milliseconds() + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG).longValue();
        this.committing = false;
        this.commitSeqno = 0;
        this.commitStarted = -1L;
        this.commitFailures = 0;
        this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(connectorTaskId, connectMetrics);
        this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(this.commitSeqno);
        this.consumer = kafkaConsumer;
        this.isTopicTrackingEnabled = workerConfig.getBoolean(WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG).booleanValue();
        this.taskStopped = false;
        this.workerErrantRecordReporter = workerErrantRecordReporter;
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void initialize(TaskConfig taskConfig) {
        try {
            this.taskConfig = taskConfig.originalsStrings();
            this.context = new WorkerSinkTaskContext(this.consumer, this, this.configState);
        } catch (Throwable th) {
            log.error("{} Task failed initialization and will not be started.", this, th);
            onFailure(th);
        }
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void stop() {
        super.stop();
        this.consumer.wakeup();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    protected void close() {
        try {
            this.task.stop();
        } catch (Throwable th) {
            log.warn("Could not stop task", th);
        }
        this.taskStopped = true;
        Utils.closeQuietly(this.consumer, "consumer");
        Utils.closeQuietly(this.transformationChain, "transformation chain");
        Utils.closeQuietly(this.retryWithToleranceOperator, "retry operator");
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void removeMetrics() {
        try {
            this.sinkTaskMetricsGroup.close();
        } finally {
            super.removeMetrics();
        }
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void transitionTo(TargetState targetState) {
        super.transitionTo(targetState);
        this.consumer.wakeup();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void execute() {
        log.info("{} Executing sink task", this);
        try {
            Utils.UncheckedCloseable uncheckedCloseable = this::closePartitions;
            Throwable th = null;
            while (!isStopping()) {
                try {
                    try {
                        iteration();
                    } finally {
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            }
            if (uncheckedCloseable != null) {
                if (0 != 0) {
                    try {
                        uncheckedCloseable.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    uncheckedCloseable.close();
                }
            }
        } catch (WakeupException e) {
            log.trace("Consumer woken up during initial offset commit attempt, but succeeded during a later attempt");
        }
    }

    protected void iteration() {
        long longValue = this.workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG).longValue();
        try {
            long milliseconds = this.time.milliseconds();
            if (!this.committing && (this.context.isCommitRequested() || milliseconds >= this.nextCommit)) {
                commitOffsets(milliseconds, false);
                this.nextCommit = milliseconds + longValue;
                this.context.clearCommitRequest();
            }
            long longValue2 = this.commitStarted + this.workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG).longValue();
            if (this.committing && milliseconds >= longValue2) {
                log.warn("{} Commit of offsets timed out", this);
                this.commitFailures++;
                this.committing = false;
            }
            poll(Math.max(this.nextCommit - milliseconds, 0L));
        } catch (WakeupException e) {
            log.trace("{} Consumer woken up", this);
            if (isStopping()) {
                return;
            }
            if (shouldPause()) {
                pauseAll();
                onPause();
                this.context.requestCommit();
            } else {
                if (this.pausedForRedelivery) {
                    return;
                }
                resumeAll();
                onResume();
            }
        }
    }

    private void onCommitCompleted(Throwable th, long j, Map<TopicPartition, OffsetAndMetadata> map) {
        if (this.commitSeqno != j) {
            log.debug("{} Received out of order commit callback for sequence number {}, but most recent sequence number is {}", new Object[]{this, Long.valueOf(j), Integer.valueOf(this.commitSeqno)});
            this.sinkTaskMetricsGroup.recordOffsetCommitSkip();
            return;
        }
        long milliseconds = this.time.milliseconds() - this.commitStarted;
        if (th != null) {
            log.error("{} Commit of offsets threw an unexpected exception for sequence number {}: {}", new Object[]{this, Long.valueOf(j), map, th});
            this.commitFailures++;
            recordCommitFailure(milliseconds, th);
        } else {
            log.debug("{} Finished offset commit successfully in {} ms for sequence number {}: {}", new Object[]{this, Long.valueOf(milliseconds), Long.valueOf(j), map});
            if (map != null) {
                log.debug("{} Setting last committed offsets to {}", this, map);
                this.lastCommittedOffsets = map;
                this.sinkTaskMetricsGroup.recordCommittedOffsets(map);
            }
            this.commitFailures = 0;
            recordCommitSuccess(milliseconds);
        }
        this.committing = false;
    }

    public int commitFailures() {
        return this.commitFailures;
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    protected void initializeAndStart() {
        SinkConnectorConfig.validate(this.taskConfig);
        if (SinkConnectorConfig.hasTopicsConfig(this.taskConfig)) {
            List<String> parseTopicsList = SinkConnectorConfig.parseTopicsList(this.taskConfig);
            this.consumer.subscribe(parseTopicsList, new HandleRebalance());
            log.debug("{} Initializing and starting task for topics {}", this, Utils.join(parseTopicsList, ", "));
        } else {
            String str = this.taskConfig.get(SinkConnectorConfig.TOPICS_REGEX_CONFIG);
            this.consumer.subscribe(Pattern.compile(str), new HandleRebalance());
            log.debug("{} Initializing and starting task for topics regex {}", this, str);
        }
        this.task.initialize(this.context);
        this.task.start(this.taskConfig);
        log.info("{} Sink task finished initialization and start", this);
    }

    protected void poll(long j) {
        rewind();
        long timeout = this.context.timeout();
        if (timeout > 0) {
            j = Math.min(j, timeout);
            this.context.timeout(-1L);
        }
        log.trace("{} Polling consumer with timeout {} ms", this, Long.valueOf(j));
        ConsumerRecords<byte[], byte[]> pollConsumer = pollConsumer(j);
        if (!$assertionsDisabled && !this.messageBatch.isEmpty() && !pollConsumer.isEmpty()) {
            throw new AssertionError();
        }
        log.trace("{} Polling returned {} messages", this, Integer.valueOf(pollConsumer.count()));
        convertMessages(pollConsumer);
        deliverMessages();
    }

    boolean isCommitting() {
        return this.committing;
    }

    private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> map, int i) {
        log.debug("{} Committing offsets synchronously using sequence number {}: {}", new Object[]{this, Integer.valueOf(i), map});
        try {
            this.consumer.commitSync(map);
            onCommitCompleted(null, i, map);
        } catch (WakeupException e) {
            doCommitSync(map, i);
            throw e;
        } catch (KafkaException e2) {
            onCommitCompleted(e2, i, map);
        }
    }

    private void doCommitAsync(Map<TopicPartition, OffsetAndMetadata> map, int i) {
        log.debug("{} Committing offsets asynchronously using sequence number {}: {}", new Object[]{this, Integer.valueOf(i), map});
        this.consumer.commitAsync(map, (map2, exc) -> {
            onCommitCompleted(exc, i, map2);
        });
    }

    private void doCommit(Map<TopicPartition, OffsetAndMetadata> map, boolean z, int i) {
        if (z) {
            doCommitSync(map, i);
        } else {
            doCommitAsync(map, i);
        }
    }

    private void commitOffsets(long j, boolean z) {
        if (this.workerErrantRecordReporter != null) {
            log.trace("Awaiting all reported errors to be completed");
            this.workerErrantRecordReporter.awaitAllFutures();
            log.trace("Completed all reported errors");
        }
        if (this.currentOffsets.isEmpty()) {
            return;
        }
        this.committing = true;
        this.commitSeqno++;
        this.commitStarted = j;
        this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(this.commitSeqno);
        try {
            try {
                log.trace("{} Calling task.preCommit with current offsets: {}", this, this.currentOffsets);
                Map preCommit = this.task.preCommit(new HashMap(this.currentOffsets));
                if (z) {
                    log.trace("{} Closing the task before committing the offsets: {}", this, this.currentOffsets);
                    this.task.close(this.currentOffsets.keySet());
                }
                if (preCommit.isEmpty()) {
                    log.debug("{} Skipping offset commit, task opted-out by returning no offsets from preCommit", this);
                    onCommitCompleted(null, this.commitSeqno, null);
                    return;
                }
                HashMap hashMap = new HashMap(this.lastCommittedOffsets);
                for (Map.Entry entry : preCommit.entrySet()) {
                    TopicPartition topicPartition = (TopicPartition) entry.getKey();
                    OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) entry.getValue();
                    if (hashMap.containsKey(topicPartition)) {
                        long offset = offsetAndMetadata.offset();
                        long offset2 = this.currentOffsets.get(topicPartition).offset();
                        if (offset <= offset2) {
                            hashMap.put(topicPartition, offsetAndMetadata);
                        } else {
                            log.warn("{} Ignoring invalid task provided offset {}/{} -- not yet consumed, taskOffset={} currentOffset={}", new Object[]{this, topicPartition, offsetAndMetadata, Long.valueOf(offset), Long.valueOf(offset2)});
                        }
                    } else {
                        log.warn("{} Ignoring invalid task provided offset {}/{} -- partition not assigned, assignment={}", new Object[]{this, topicPartition, offsetAndMetadata, this.consumer.assignment()});
                    }
                }
                if (!hashMap.equals(this.lastCommittedOffsets)) {
                    doCommit(hashMap, z, this.commitSeqno);
                } else {
                    log.debug("{} Skipping offset commit, no change since last commit", this);
                    onCommitCompleted(null, this.commitSeqno, null);
                }
            } catch (Throwable th) {
                if (z) {
                    log.warn("{} Offset commit failed during close", this);
                    onCommitCompleted(th, this.commitSeqno, null);
                } else {
                    log.error("{} Offset commit failed, rewinding to last committed offsets", this, th);
                    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : this.lastCommittedOffsets.entrySet()) {
                        log.debug("{} Rewinding topic partition {} to offset {}", new Object[]{this, entry2.getKey(), Long.valueOf(entry2.getValue().offset())});
                        this.consumer.seek(entry2.getKey(), entry2.getValue().offset());
                    }
                    this.currentOffsets = new HashMap(this.lastCommittedOffsets);
                    onCommitCompleted(th, this.commitSeqno, null);
                }
                if (z) {
                    log.trace("{} Closing the task before committing the offsets: {}", this, this.currentOffsets);
                    this.task.close(this.currentOffsets.keySet());
                }
            }
        } catch (Throwable th2) {
            if (z) {
                log.trace("{} Closing the task before committing the offsets: {}", this, this.currentOffsets);
                this.task.close(this.currentOffsets.keySet());
            }
            throw th2;
        }
    }

    public String toString() {
        return "WorkerSinkTask{id=" + this.id + '}';
    }

    private ConsumerRecords<byte[], byte[]> pollConsumer(long j) {
        ConsumerRecords<byte[], byte[]> poll = this.consumer.poll(Duration.ofMillis(j));
        if (this.rebalanceException == null) {
            this.sinkTaskMetricsGroup.recordRead(poll.count());
            return poll;
        }
        RuntimeException runtimeException = this.rebalanceException;
        this.rebalanceException = null;
        throw runtimeException;
    }

    private void convertMessages(ConsumerRecords<byte[], byte[]> consumerRecords) {
        this.origOffsets.clear();
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
            log.trace("{} Consuming and converting message in topic '{}' partition {} at offset {} and timestamp {}", new Object[]{this, consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp())});
            this.retryWithToleranceOperator.consumerRecord(consumerRecord);
            SinkRecord convertAndTransformRecord = convertAndTransformRecord(consumerRecord);
            this.origOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
            if (convertAndTransformRecord != null) {
                this.messageBatch.add(convertAndTransformRecord);
            } else {
                log.trace("{} Converters and transformations returned null, possibly because of too many retries, so dropping record in topic '{}' partition {} at offset {}", new Object[]{this, consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
            }
        }
        this.sinkTaskMetricsGroup.recordConsumedOffsets(this.origOffsets);
    }

    private SinkRecord convertAndTransformRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        SchemaAndValue schemaAndValue = (SchemaAndValue) this.retryWithToleranceOperator.execute(() -> {
            return convertKey(consumerRecord);
        }, Stage.KEY_CONVERTER, this.keyConverter.getClass());
        SchemaAndValue schemaAndValue2 = (SchemaAndValue) this.retryWithToleranceOperator.execute(() -> {
            return convertValue(consumerRecord);
        }, Stage.VALUE_CONVERTER, this.valueConverter.getClass());
        Headers headers = (Headers) this.retryWithToleranceOperator.execute(() -> {
            return convertHeadersFor(consumerRecord);
        }, Stage.HEADER_CONVERTER, this.headerConverter.getClass());
        if (this.retryWithToleranceOperator.failed()) {
            return null;
        }
        Long checkAndConvertTimestamp = ConnectUtils.checkAndConvertTimestamp(Long.valueOf(consumerRecord.timestamp()));
        SinkRecord sinkRecord = new SinkRecord(consumerRecord.topic(), consumerRecord.partition(), schemaAndValue.schema(), schemaAndValue.value(), schemaAndValue2.schema(), schemaAndValue2.value(), consumerRecord.offset(), checkAndConvertTimestamp, consumerRecord.timestampType(), headers);
        log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}", new Object[]{this, consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), checkAndConvertTimestamp, schemaAndValue.value(), schemaAndValue2.value()});
        if (this.isTopicTrackingEnabled) {
            recordActiveTopic(sinkRecord.topic());
        }
        SinkRecord apply = this.transformationChain.apply(sinkRecord);
        if (apply == null) {
            return null;
        }
        return new InternalSinkRecord(consumerRecord, apply);
    }

    private SchemaAndValue convertKey(ConsumerRecord<byte[], byte[]> consumerRecord) {
        try {
            return this.keyConverter.toConnectData(consumerRecord.topic(), consumerRecord.headers(), (byte[]) consumerRecord.key());
        } catch (Exception e) {
            log.error("{} Error converting message key in topic '{}' partition {} at offset {} and timestamp {}: {}", new Object[]{this, consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), e.getMessage(), e});
            throw e;
        }
    }

    private SchemaAndValue convertValue(ConsumerRecord<byte[], byte[]> consumerRecord) {
        try {
            return this.valueConverter.toConnectData(consumerRecord.topic(), consumerRecord.headers(), (byte[]) consumerRecord.value());
        } catch (Exception e) {
            log.error("{} Error converting message value in topic '{}' partition {} at offset {} and timestamp {}: {}", new Object[]{this, consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), e.getMessage(), e});
            throw e;
        }
    }

    private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> consumerRecord) {
        ConnectHeaders connectHeaders = new ConnectHeaders();
        org.apache.kafka.common.header.Headers<Header> headers = consumerRecord.headers();
        if (headers != null) {
            String str = consumerRecord.topic();
            for (Header header : headers) {
                connectHeaders.add(header.key(), this.headerConverter.toConnectHeader(str, header.key(), header.value()));
            }
        }
        return connectHeaders;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public WorkerErrantRecordReporter workerErrantRecordReporter() {
        return this.workerErrantRecordReporter;
    }

    private void resumeAll() {
        for (TopicPartition topicPartition : this.consumer.assignment()) {
            if (!this.context.pausedPartitions().contains(topicPartition)) {
                this.consumer.resume(Collections.singleton(topicPartition));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void pauseAll() {
        this.consumer.pause(this.consumer.assignment());
    }

    private void deliverMessages() {
        try {
            log.trace("{} Delivering batch of {} messages to task", this, Integer.valueOf(this.messageBatch.size()));
            long milliseconds = this.time.milliseconds();
            this.task.put(new ArrayList(this.messageBatch));
            if (this.retryWithToleranceOperator.failed() && !this.retryWithToleranceOperator.withinToleranceLimits()) {
                throw new ConnectException("Tolerance exceeded in error handler", this.retryWithToleranceOperator.error());
            }
            recordBatch(this.messageBatch.size());
            this.sinkTaskMetricsGroup.recordPut(this.time.milliseconds() - milliseconds);
            this.currentOffsets.putAll(this.origOffsets);
            this.messageBatch.clear();
            if (this.pausedForRedelivery) {
                if (!shouldPause()) {
                    resumeAll();
                }
                this.pausedForRedelivery = false;
            }
        } catch (RetriableException e) {
            log.error("{} RetriableException from SinkTask:", this, e);
            this.pausedForRedelivery = true;
            pauseAll();
        } catch (Throwable th) {
            log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: {}", new Object[]{this, th.getMessage(), th});
            throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rewind() {
        Map<TopicPartition, Long> offsets = this.context.offsets();
        if (offsets.isEmpty()) {
            return;
        }
        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
            TopicPartition key = entry.getKey();
            Long value = entry.getValue();
            if (value != null) {
                log.trace("{} Rewind {} to offset {}", new Object[]{this, key, value});
                this.consumer.seek(key, value.longValue());
                this.lastCommittedOffsets.put(key, new OffsetAndMetadata(value.longValue()));
                this.currentOffsets.put(key, new OffsetAndMetadata(value.longValue()));
            } else {
                log.warn("{} Cannot rewind {} to null offset", this, key);
            }
        }
        this.context.clearOffsets();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void openPartitions(Collection<TopicPartition> collection) {
        this.sinkTaskMetricsGroup.recordPartitionCount(collection.size());
        this.task.open(collection);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closePartitions() {
        commitOffsets(this.time.milliseconds(), true);
        this.sinkTaskMetricsGroup.recordPartitionCount(0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void recordBatch(int i) {
        super.recordBatch(i);
        this.sinkTaskMetricsGroup.recordSend(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void recordCommitFailure(long j, Throwable th) {
        super.recordCommitFailure(j, th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void recordCommitSuccess(long j) {
        super.recordCommitSuccess(j);
        this.sinkTaskMetricsGroup.recordOffsetCommitSuccess();
    }

    SinkTaskMetricsGroup sinkTaskMetricsGroup() {
        return this.sinkTaskMetricsGroup;
    }

    long getNextCommit() {
        return this.nextCommit;
    }

    static {
        $assertionsDisabled = !WorkerSinkTask.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(WorkerSinkTask.class);
    }
}
