package io.confluent.connect.replicator;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.connect.replicator.DeadlineManager;
import io.confluent.connect.replicator.KafkaConfigs;
import io.confluent.connect.replicator.metrics.ConfluentReplicatorMetrics;
import io.confluent.connect.replicator.metrics.ConfluentReplicatorTaskMetricsGroup;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTopicCommitter;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslator;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsCommitter;
import io.confluent.connect.replicator.offsets.OffsetManager;
import io.confluent.connect.replicator.schemas.SchemaTranslator;
import io.confluent.connect.replicator.util.NewReplicatorAdminClient;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import io.confluent.connect.replicator.util.TopicMetadata;
import io.confluent.connect.replicator.util.TranslatorMonitor;
import io.confluent.connect.replicator.util.Version;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/ReplicatorSourceTask.class */
public class ReplicatorSourceTask extends SourceTask {
    public static final String REPLICATOR_ID_HEADER = "__replicator_id";
    private String taskId;
    private ReplicatorSourceTaskConfig config;
    private Converter sourceKeyConverter;
    private Converter sourceValueConverter;
    private ReplicatorAdminClient sourceAdminClient;
    private ReplicatorAdminClient destAdminClient;
    private final TranslatorMonitor translatorMonitor;
    private String sourceClusterId;
    private String destClusterId;
    DeadlineManager deadlineManager;
    private final Set<String> sourceTopicsNeedingExpansion;
    private final Set<String> managedSourceTopics;
    private volatile Consumer<byte[], byte[]> consumer;
    private Map<String, Translator> translators;
    private ConsumerTimestampsCommitter timestampsCommitter;
    private ConsumerOffsetsTopicCommitter offsetTopicCommitter;
    private volatile boolean isStarting;
    private HeaderConverter converter;
    private List<FilterOverride> filterOverrides;
    private ConfluentReplicatorTaskMetricsGroup replicatorTaskMetricsGroup;
    private OffsetManager offsetManager;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReplicatorSourceTask.class);
    public static final Pattern PROVENANCE_HEADER_PATTERN = Pattern.compile("([^,]+),([^,]+),([^,]+)");
    public static final Pattern FILTER_OVERRIDE_PATTERN = Pattern.compile(ReplicatorSourceConnectorConfig.FILTER_OVERRIDE_PATTERN);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/confluent/connect/replicator/ReplicatorSourceTask$FilterOverride.class */
    public static class FilterOverride {
        private Pattern clusterId;
        private Pattern topic;
        private long startTsInclusive;
        private long endTsExclusive;

        public FilterOverride(Matcher matcher) {
            this.clusterId = Pattern.compile(matcher.group(1));
            this.topic = Pattern.compile(matcher.group(2));
            try {
                this.startTsInclusive = Long.parseLong(matcher.group(3));
            } catch (NumberFormatException e) {
                this.startTsInclusive = 0L;
            }
            try {
                this.endTsExclusive = Long.parseLong(matcher.group(4));
            } catch (NumberFormatException e2) {
                this.endTsExclusive = Long.MAX_VALUE;
            }
        }

        public Pattern clusterId() {
            return this.clusterId;
        }

        public Pattern topic() {
            return this.topic;
        }

        public long startTsInclusive() {
            return this.startTsInclusive;
        }

        public long endTsExclusive() {
            return this.endTsExclusive;
        }

        public boolean matches(ProvenanceHeader provenanceHeader) {
            Long ts = provenanceHeader.ts();
            return (ts == null || (ts.longValue() > (-1L) ? 1 : (ts.longValue() == (-1L) ? 0 : -1)) == 0 || ((ts.longValue() > this.startTsInclusive ? 1 : (ts.longValue() == this.startTsInclusive ? 0 : -1)) >= 0 && (ts.longValue() > this.endTsExclusive ? 1 : (ts.longValue() == this.endTsExclusive ? 0 : -1)) < 0)) && (provenanceHeader.topic() == null || this.topic.matcher(provenanceHeader.topic()).matches()) && this.clusterId.matcher(provenanceHeader.clusterId()).matches();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            FilterOverride filterOverride = (FilterOverride) obj;
            return Objects.equals(this.clusterId, filterOverride.clusterId) && Objects.equals(this.topic, filterOverride.topic) && Objects.equals(Long.valueOf(this.startTsInclusive), Long.valueOf(filterOverride.startTsInclusive)) && Objects.equals(Long.valueOf(this.endTsExclusive), Long.valueOf(filterOverride.endTsExclusive));
        }

        public int hashCode() {
            return Objects.hash(this.clusterId, this.topic, Long.valueOf(this.startTsInclusive), Long.valueOf(this.endTsExclusive));
        }

        public String toString() {
            return this.clusterId + "," + this.topic + "," + this.startTsInclusive + "-" + this.endTsExclusive;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/confluent/connect/replicator/ReplicatorSourceTask$ProvenanceHeader.class */
    public static class ProvenanceHeader {
        private String clusterId;
        private String topic;
        private Long ts;
        private boolean valid;

        public ProvenanceHeader(String str, String str2, Long l, boolean z) {
            this.clusterId = str;
            this.topic = str2;
            this.ts = l;
            this.valid = z;
        }

        public String clusterId() {
            return this.clusterId;
        }

        public String topic() {
            return this.topic;
        }

        public Long ts() {
            return this.ts;
        }

        public boolean isValid() {
            return this.valid;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ProvenanceHeader provenanceHeader = (ProvenanceHeader) obj;
            return this.valid == provenanceHeader.valid && Objects.equals(this.clusterId, provenanceHeader.clusterId) && Objects.equals(this.topic, provenanceHeader.topic) && Objects.equals(this.ts, provenanceHeader.ts);
        }

        public int hashCode() {
            return Objects.hash(this.clusterId, this.topic, this.ts, Boolean.valueOf(this.valid));
        }

        public String toString() {
            return this.clusterId + "," + this.topic + "," + this.ts;
        }
    }

    public ReplicatorSourceTask() {
        this(null, null, null, Time.SYSTEM, null, null, null, null, new TranslatorMonitor(10000L), null, null, null);
    }

    ReplicatorSourceTask(ReplicatorSourceTaskConfig replicatorSourceTaskConfig, SourceTaskContext sourceTaskContext, String str, Time time, Consumer<byte[], byte[]> consumer, ConsumerOffsetsTranslator consumerOffsetsTranslator, ReplicatorAdminClient replicatorAdminClient, ReplicatorAdminClient replicatorAdminClient2, TranslatorMonitor translatorMonitor, Converter converter, Converter converter2, ConfluentReplicatorTaskMetricsGroup confluentReplicatorTaskMetricsGroup) {
        this(replicatorSourceTaskConfig, sourceTaskContext, str, time, consumer, consumerOffsetsTranslator, replicatorAdminClient, replicatorAdminClient2, translatorMonitor, converter, converter2, null, confluentReplicatorTaskMetricsGroup);
    }

    ReplicatorSourceTask(ReplicatorSourceTaskConfig replicatorSourceTaskConfig, SourceTaskContext sourceTaskContext, String str, Time time, Consumer<byte[], byte[]> consumer, ConsumerOffsetsTranslator consumerOffsetsTranslator, ReplicatorAdminClient replicatorAdminClient, ReplicatorAdminClient replicatorAdminClient2, TranslatorMonitor translatorMonitor, Converter converter, Converter converter2, ConsumerTimestampsCommitter consumerTimestampsCommitter, ConfluentReplicatorTaskMetricsGroup confluentReplicatorTaskMetricsGroup) {
        this.deadlineManager = new DeadlineManager();
        this.sourceTopicsNeedingExpansion = new HashSet();
        this.managedSourceTopics = new HashSet();
        this.translators = new HashMap();
        this.isStarting = false;
        this.filterOverrides = new ArrayList();
        this.offsetManager = null;
        this.config = replicatorSourceTaskConfig;
        this.context = sourceTaskContext;
        this.taskId = str;
        this.deadlineManager.setTime(time);
        this.consumer = consumer;
        if (consumerOffsetsTranslator != null) {
            this.translators.put(consumerOffsetsTranslator.topic(), consumerOffsetsTranslator);
        }
        this.sourceAdminClient = replicatorAdminClient;
        this.destAdminClient = replicatorAdminClient2;
        this.translatorMonitor = translatorMonitor;
        this.sourceKeyConverter = converter;
        this.sourceValueConverter = converter2;
        this.timestampsCommitter = consumerTimestampsCommitter;
        this.replicatorTaskMetricsGroup = confluentReplicatorTaskMetricsGroup;
        this.offsetManager = new OffsetManager(replicatorSourceTaskConfig, consumer, sourceTaskContext, this.deadlineManager);
    }

    ReplicatorSourceTask(ReplicatorSourceTaskConfig replicatorSourceTaskConfig, SourceTaskContext sourceTaskContext, String str, Time time, Consumer<byte[], byte[]> consumer, ConsumerOffsetsTranslator consumerOffsetsTranslator, ReplicatorAdminClient replicatorAdminClient, ReplicatorAdminClient replicatorAdminClient2, TranslatorMonitor translatorMonitor, Converter converter, Converter converter2, ConsumerTimestampsCommitter consumerTimestampsCommitter, ConsumerOffsetsTopicCommitter consumerOffsetsTopicCommitter, ConfluentReplicatorTaskMetricsGroup confluentReplicatorTaskMetricsGroup) {
        this.deadlineManager = new DeadlineManager();
        this.sourceTopicsNeedingExpansion = new HashSet();
        this.managedSourceTopics = new HashSet();
        this.translators = new HashMap();
        this.isStarting = false;
        this.filterOverrides = new ArrayList();
        this.offsetManager = null;
        this.config = replicatorSourceTaskConfig;
        this.context = sourceTaskContext;
        this.taskId = str;
        this.deadlineManager.setTime(time);
        this.consumer = consumer;
        if (consumerOffsetsTranslator != null) {
            this.translators.put(consumerOffsetsTranslator.topic(), consumerOffsetsTranslator);
        }
        this.sourceAdminClient = replicatorAdminClient;
        this.destAdminClient = replicatorAdminClient2;
        this.translatorMonitor = translatorMonitor;
        this.sourceKeyConverter = converter;
        this.sourceValueConverter = converter2;
        this.timestampsCommitter = consumerTimestampsCommitter;
        this.offsetTopicCommitter = consumerOffsetsTopicCommitter;
        this.replicatorTaskMetricsGroup = confluentReplicatorTaskMetricsGroup;
        this.offsetManager = new OffsetManager(replicatorSourceTaskConfig, consumer, sourceTaskContext, this.deadlineManager);
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public void initialize(SourceTaskContext sourceTaskContext) {
        log.debug("Initializing SourceTaskContext for ReplicatorSourceTask");
        super.initialize(sourceTaskContext);
        this.offsetManager.setContext(sourceTaskContext);
    }

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return Version.getVersion();
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public synchronized void start(Map<String, String> map) {
        try {
            log.debug("Gathering configs for Replicator source task");
            this.config = taskConfig(map);
            this.offsetManager.setConfig(this.config);
            this.taskId = this.config.getTaskId();
            log.info("Starting Replicator source task {}", this.taskId);
            this.sourceKeyConverter = this.config.getSourceKeyConverter();
            this.sourceValueConverter = this.config.getSourceValueConverter();
            log.debug("Creating destination admin client...");
            ReplicatorAdminClient replicatorAdminClient = this.destAdminClient;
            ReplicatorSourceTaskConfig replicatorSourceTaskConfig = this.config;
            replicatorSourceTaskConfig.getClass();
            this.destAdminClient = createAdminClient(replicatorAdminClient, replicatorSourceTaskConfig::dstAdminClientConfig);
            log.debug("Creating source admin client...");
            ReplicatorAdminClient replicatorAdminClient2 = this.sourceAdminClient;
            ReplicatorSourceTaskConfig replicatorSourceTaskConfig2 = this.config;
            replicatorSourceTaskConfig2.getClass();
            this.sourceAdminClient = createAdminClient(replicatorAdminClient2, replicatorSourceTaskConfig2::srcAdminClientConfig);
            setClusterIds();
            if (this.config.isProvenanceHeaderEnabled()) {
                log.debug("provenance.header.enable is set to true. Setting up filter overrides");
                parseFilterOverrides(this.config.getString(ReplicatorSourceConnectorConfig.PROVENANCE_HEADER_FILTER_OVERRIDES_CONFIG), this.filterOverrides);
            }
            log.debug("Building source consumer for Replicator source task {}", this.taskId);
            this.consumer = buildSourceConsumer(this.config);
            log.debug("Building Consumer Offsets Translator for Replicator source task {}", this.taskId);
            ConsumerOffsetsTranslator consumerOffsetsTranslator = new ConsumerOffsetsTranslator(this.config.originalsStrings(), this.taskId, this.deadlineManager.getTime(), this.config.getOffsetTranslatorBatchPeriodMs(), this.config.getOffsetTranslatorBatchSize());
            this.translators.put(consumerOffsetsTranslator.topic(), consumerOffsetsTranslator);
            this.offsetManager.setConsumer(this.consumer);
            if (this.config.isOffsetTimestampsCommitEnabled()) {
                log.debug("offset.timestamps.commit is set to true. Building the Consumer Timestamps Committer");
                String str = map.get("src.consumer.group.id");
                if (str == null) {
                    str = this.config.getName();
                }
                this.timestampsCommitter = new ConsumerTimestampsCommitter(str, this.config.originalsWithPrefix(KafkaConfigs.KafkaCluster.SOURCE.prefix()), this.sourceAdminClient);
            }
            if (this.config.isOffsetTopicCommitEnabled()) {
                log.debug("offset.topic.commit is set to true. Building the Consumer Offsets Topic Committer");
                int offsetTopicCommitBatchPeriodMs = this.config.getOffsetTopicCommitBatchPeriodMs();
                this.offsetTopicCommitter = new ConsumerOffsetsTopicCommitter(this.consumer, offsetTopicCommitBatchPeriodMs > -1 && this.config.isProvenanceHeaderEnabled() && this.config.getTopicPreservePartitions(), this.deadlineManager.getTime(), offsetTopicCommitBatchPeriodMs);
            }
            if (this.config.getSchemaRegistryTopic() != null) {
                log.info("Registering schema translator for topic {}", this.config.getSchemaRegistryTopic());
                this.translators.put(this.config.getSchemaRegistryTopic(), new SchemaTranslator(this.config, this.deadlineManager.getTime()));
            }
            this.converter = this.config.getSourceHeaderConverter();
            log.debug("Fetching source assignments for Replicator source task {}", this.taskId);
            List<TopicPartition> partitions = this.config.getPartitionAssignment().partitions();
            doStart(partitions);
            log.info("Started kafka replicator task {} replicating topic partitions {}", this.taskId, partitions);
            log.info("Setting up metrics recording for task {}...", this.taskId);
            if (this.replicatorTaskMetricsGroup == null) {
                this.replicatorTaskMetricsGroup = new ConfluentReplicatorTaskMetricsGroup(this.config, this.taskId, partitions, new ConfluentReplicatorMetrics(this.taskId, this.deadlineManager.getTime()), this.sourceClusterId, this.destClusterId, this.config.getName());
                this.replicatorTaskMetricsGroup.setupMetrics();
            }
            log.info("Successfully set up metrics recording for task {}", this.taskId);
            log.info("Successfully started up Replicator source task {}", this.taskId);
        } catch (ConfigException e) {
            throw new ConnectException("Failed to start Kafka replicator task due to configuration error", e);
        }
    }

    @VisibleForTesting
    protected void setClusterIds() {
        try {
            this.sourceClusterId = this.sourceAdminClient.clusterId();
        } catch (Exception e) {
            log.error("Failed to obtain source cluster ID", (Throwable) e);
        }
        if (this.sourceClusterId == null) {
            throw new ConnectException("Failed to obtain source cluster ID, please restart the source Kafka cluster");
        }
        try {
            this.destClusterId = this.destAdminClient.clusterId();
        } catch (Exception e2) {
            log.error("Failed to obtain destination cluster ID", (Throwable) e2);
        }
        if (this.destClusterId == null) {
            throw new ConnectException("Failed to obtain destination cluster ID, please restart the destination Kafka cluster");
        }
        if (this.sourceClusterId.equals(this.destClusterId)) {
            log.warn("The source and destination cluster IDs match. This is normal when replicating to different topics in the same cluster. Otherwise, check your source and destination cluster properties.");
        }
        log.info("Source cluster ID: {}", this.sourceClusterId);
        log.info("Destination cluster ID: {}", this.destClusterId);
    }

    protected static void parseFilterOverrides(String str, List<FilterOverride> list) {
        if (str == null || str.isEmpty()) {
            return;
        }
        Matcher matcher = FILTER_OVERRIDE_PATTERN.matcher(str);
        while (matcher.find()) {
            list.add(new FilterOverride(matcher));
        }
    }

    synchronized void doStart(Collection<TopicPartition> collection) {
        this.isStarting = true;
        try {
            log.debug("Assigning the consumer of Replicator source task {} to the following source assignment {}", this.taskId, collection);
            this.consumer.assign(collection);
            watchTopicsForAssignedPartitions(collection);
            initializeAssignedPartitions(collection);
        } finally {
            this.isStarting = false;
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public synchronized List<SourceRecord> poll() {
        log.debug("Beginning poll for Replicator source task {}", this.taskId);
        try {
            try {
                if (this.offsetTopicCommitter != null) {
                    this.offsetTopicCommitter.checkCommit();
                }
                log.debug("Retrying topic expansion if needed...");
                retryTopicExpansionIfNeeded();
                log.debug("Updating topic configs if needed...");
                updateTopicConfigsIfNeeded();
                log.debug("Attempting to resume any paused partitions...");
                maybeResumePartitions();
                List list = (List) this.translators.values().stream().map((v0) -> {
                    return v0.nextDeadline();
                }).collect(Collectors.toList());
                list.add(this.deadlineManager.get(DeadlineManager.DeadlineType.RETRY_TOPIC_EXPANSION));
                list.add(this.deadlineManager.get(DeadlineManager.DeadlineType.TOPIC_CONFIG_CHECK));
                list.add(this.deadlineManager.get(DeadlineManager.DeadlineType.TRY_RESUME));
                long milliSeconds = this.deadlineManager.getMilliSeconds();
                long max = Math.max(0L, io.confluent.connect.replicator.util.Utils.nextDeadline((Long[]) list.toArray(new Long[list.size()])) - milliSeconds);
                log.debug("Polling for records, waiting at most {} ms", Long.valueOf(max));
                ConsumerRecords<byte[], byte[]> empty = ConsumerRecords.empty();
                if (!this.consumer.assignment().isEmpty()) {
                    empty = this.consumer.poll(max);
                }
                if (log.isDebugEnabled()) {
                    Set<String> set = topicNamesFor(empty);
                    log.debug("Read {} records from {} topics: {}", Integer.valueOf(empty.count()), Integer.valueOf(set.size()), set);
                }
                boolean topicPreservePartitions = this.config.getTopicPreservePartitions();
                ArrayList arrayList = new ArrayList(empty.count());
                int i = 0;
                for (TopicPartition topicPartition : empty.partitions()) {
                    if (skipRecordReplication(topicPartition, empty)) {
                        log.trace("Skipping record replication for topic partition {} because it's part of the __consumer_timestamps topic or part of a schema topic", topicPartition);
                    } else {
                        String str = topicPartition.topic();
                        String destTopic = toDestTopic(str, this.config);
                        int partition = topicPartition.partition();
                        Map<String, ?> connectPartition = io.confluent.connect.replicator.util.Utils.toConnectPartition(str, partition);
                        long j = 0;
                        for (ConsumerRecord<byte[], byte[]> consumerRecord : empty.records(topicPartition)) {
                            if (this.config.isProvenanceHeaderEnabled() && shouldFilterRecord(consumerRecord, this.destClusterId, destTopic, this.filterOverrides)) {
                                log.trace("Filtering record from topic partition {} due to provenance headers", topicPartition);
                                i++;
                                if (this.offsetTopicCommitter != null) {
                                    this.offsetTopicCommitter.maybeCommitFilteredRecordByReplicator(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
                                }
                            } else {
                                j++;
                                Map<String, Object> connectOffset = io.confluent.connect.replicator.util.Utils.toConnectOffset(consumerRecord.offset());
                                Pair<SchemaAndValue, SchemaAndValue> convertKeyValue = convertKeyValue(consumerRecord, str);
                                arrayList.add(new SourceRecord(connectPartition, connectOffset, destTopic, topicPreservePartitions ? Integer.valueOf(partition) : null, convertKeyValue.getKey().schema(), convertKeyValue.getKey().value(), convertKeyValue.getValue().schema(), convertKeyValue.getValue().value(), timestampFromRecord(consumerRecord), toConnectHeaders(this.sourceClusterId, str, consumerRecord, this.converter, this.config.isProvenanceHeaderEnabled(), milliSeconds)));
                            }
                        }
                        if (this.offsetTopicCommitter != null) {
                            this.offsetTopicCommitter.updateNumUncommittedRecords(topicPartition, j);
                        }
                    }
                }
                if (log.isDebugEnabled()) {
                    Logger logger = log;
                    Object[] objArr = new Object[3];
                    objArr[0] = Integer.valueOf(arrayList.size());
                    objArr[1] = this.config.isProvenanceHeaderEnabled() ? " and adding provenance headers" : "";
                    objArr[2] = Integer.valueOf(i);
                    logger.debug("Replicating {} records{}, filtering {} records due to provenance headers", objArr);
                }
                translateCollectedRecords();
                return arrayList;
            } catch (OffsetOutOfRangeException e) {
                Map<TopicPartition, Long> offsetOutOfRangePartitions = e.offsetOutOfRangePartitions();
                log.warn("Consumer from source cluster detected out of range partitions: {}", offsetOutOfRangePartitions);
                this.offsetManager.seekToBeginning(offsetOutOfRangePartitions.keySet());
                List<SourceRecord> emptyList = Collections.emptyList();
                translateCollectedRecords();
                return emptyList;
            } catch (WakeupException e2) {
                log.debug("Kafka replicator task {} woken up", this.taskId);
                List<SourceRecord> emptyList2 = Collections.emptyList();
                translateCollectedRecords();
                return emptyList2;
            }
        } catch (Throwable th) {
            translateCollectedRecords();
            throw th;
        }
    }

    private Pair<SchemaAndValue, SchemaAndValue> convertKeyValue(ConsumerRecord<byte[], byte[]> consumerRecord, String str) {
        try {
            try {
                try {
                    return Pair.of(this.sourceKeyConverter.toConnectData(str, consumerRecord.key()), this.sourceValueConverter.toConnectData(str, consumerRecord.value()));
                } catch (Exception e) {
                    log.error("Failed to convert source record value under topic {}", str, e);
                    throw e;
                }
            } catch (Exception e2) {
                log.error("Failed to convert source record key under topic {}", str, e2);
                throw e2;
            }
        } catch (Throwable th) {
            try {
                this.sourceValueConverter.toConnectData(str, consumerRecord.value());
                throw th;
            } catch (Exception e3) {
                log.error("Failed to convert source record value under topic {}", str, e3);
                throw e3;
            }
        }
    }

    private void translateCollectedRecords() {
        for (Translator translator : this.translators.values()) {
            if (translator.isDestinationReady()) {
                List<ConsumerRecord<byte[], byte[]>> translateCollectedRecords = translator.translateCollectedRecords();
                if (translateCollectedRecords.size() > 0) {
                    log.debug("Translated " + translateCollectedRecords.size() + " collected records with translator: " + translator.getClass().getName());
                }
                if (this.offsetTopicCommitter != null) {
                    this.offsetTopicCommitter.commitRecords(translateCollectedRecords);
                }
            } else {
                log.debug("Pausing partitions for __consumer_timestamps and/or schema topics since destination is not ready");
                List<OffsetManager.TopicPartitionInfo> list = (List) this.consumer.assignment().stream().filter(topicPartition -> {
                    return translator.topic().equals(topicPartition.topic());
                }).map(topicPartition2 -> {
                    return new OffsetManager.TopicPartitionInfo(topicPartition2, "partition " + toDestPartition(topicPartition2, this.config), true);
                }).collect(Collectors.toList());
                if (translator.seekToBeginningOnPause()) {
                    list.forEach(topicPartitionInfo -> {
                        log.trace("Seeking to the beginning and pausing source partition {} since destination {} is not ready yet", topicPartitionInfo.getSourcePartition(), topicPartitionInfo.getDestinationId());
                    });
                    this.offsetManager.seekToBeginning((Collection) list.stream().map(topicPartitionInfo2 -> {
                        return topicPartitionInfo2.getSourcePartition();
                    }).collect(Collectors.toList()));
                } else {
                    list.forEach(topicPartitionInfo3 -> {
                        log.trace("Pausing source partition {} since destination {} is not ready yet", topicPartitionInfo3.getSourcePartition(), topicPartitionInfo3.getDestinationId());
                    });
                }
                this.offsetManager.pauseSourcePartitions(list);
            }
        }
    }

    private boolean skipRecordReplication(TopicPartition topicPartition, ConsumerRecords<byte[], byte[]> consumerRecords) {
        Translator translator = this.translators.get(topicPartition.topic());
        if (translator == null) {
            return false;
        }
        translator.collect(consumerRecords.records(topicPartition));
        return true;
    }

    protected static Set<String> topicNamesFor(ConsumerRecords<byte[], byte[]> consumerRecords) {
        return consumerRecords.isEmpty() ? Collections.emptySet() : (Set) consumerRecords.partitions().stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet());
    }

    protected static Long timestampFromRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        Long l;
        if (consumerRecord.timestamp() >= 0) {
            l = Long.valueOf(consumerRecord.timestamp());
        } else {
            if (consumerRecord.timestamp() != -1) {
                throw new CorruptRecordException(String.format("Invalid Record timestamp: %d", Long.valueOf(consumerRecord.timestamp())));
            }
            l = null;
        }
        return l;
    }

    protected static boolean shouldFilterRecord(ConsumerRecord<byte[], byte[]> consumerRecord, String str, String str2, List<FilterOverride> list) {
        boolean z = false;
        Iterator<Header> it = consumerRecord.headers().headers(REPLICATOR_ID_HEADER).iterator();
        while (it.hasNext()) {
            ProvenanceHeader parseProvenanceHeader = parseProvenanceHeader(it.next().value(), consumerRecord);
            if (parseProvenanceHeader.isValid()) {
                if (str.equals(parseProvenanceHeader.clusterId()) && str2.equals(parseProvenanceHeader.topic())) {
                    if (matchesFilterOverride(parseProvenanceHeader, list)) {
                        log.trace("No candidate filtered headers matched an override");
                        return false;
                    }
                    log.trace("Found candidate filtered header {}", parseProvenanceHeader);
                    z = true;
                }
            } else {
                if (matchesFilterOverride(parseProvenanceHeader, list)) {
                    log.trace("No candidate filtered headers matched an override");
                    return false;
                }
                log.trace("Found candidate filtered header {}", parseProvenanceHeader);
                z = true;
            }
        }
        return z;
    }

    protected static boolean matchesFilterOverride(ProvenanceHeader provenanceHeader, List<FilterOverride> list) {
        for (FilterOverride filterOverride : list) {
            if (filterOverride.matches(provenanceHeader)) {
                log.trace("Candidate filtered header {} matches override {}", provenanceHeader, filterOverride);
                return true;
            }
        }
        return false;
    }

    protected static ProvenanceHeader parseProvenanceHeader(byte[] bArr, ConsumerRecord<byte[], byte[]> consumerRecord) {
        long timestamp;
        String str = new String(bArr, StandardCharsets.UTF_8);
        Matcher matcher = PROVENANCE_HEADER_PATTERN.matcher(str);
        if (!matcher.matches()) {
            return new ProvenanceHeader(str, consumerRecord.topic(), Long.valueOf(consumerRecord.timestamp()), false);
        }
        String group = matcher.group(1);
        String group2 = matcher.group(2);
        try {
            timestamp = Long.parseLong(matcher.group(3));
        } catch (NumberFormatException e) {
            timestamp = consumerRecord.timestamp();
        }
        return new ProvenanceHeader(group, group2, Long.valueOf(timestamp), true);
    }

    protected static byte[] formatProvenanceHeader(String str, String str2, Long l) {
        StringBuilder sb = new StringBuilder();
        sb.append(str).append(",").append(str2).append(",").append(l);
        return sb.toString().getBytes(StandardCharsets.UTF_8);
    }

    protected static ConnectHeaders toConnectHeaders(String str, String str2, ConsumerRecord<byte[], byte[]> consumerRecord, HeaderConverter headerConverter, boolean z, long j) {
        log.trace("Creating a Connect header for new Source Record...");
        Headers headers = consumerRecord.headers();
        ConnectHeaders connectHeaders = new ConnectHeaders();
        if (headers != null) {
            for (Header header : headers) {
                if (z && REPLICATOR_ID_HEADER.equals(header.key())) {
                    ProvenanceHeader parseProvenanceHeader = parseProvenanceHeader(header.value(), consumerRecord);
                    if (parseProvenanceHeader.isValid() && str.equals(parseProvenanceHeader.clusterId()) && str2.equals(parseProvenanceHeader.topic())) {
                    }
                }
                connectHeaders.add(header.key(), headerConverter.toConnectHeader(str2, header.key(), header.value()));
            }
        }
        if (z) {
            connectHeaders.add(REPLICATOR_ID_HEADER, formatProvenanceHeader(str, str2, Long.valueOf(j)), Schema.BYTES_SCHEMA);
        }
        return connectHeaders;
    }

    private void maybeResumePartitions() {
        long milliSeconds = this.deadlineManager.getMilliSeconds();
        Long l = this.deadlineManager.get(DeadlineManager.DeadlineType.TRY_RESUME);
        if (l == null) {
            log.debug("No partitions to resume");
            return;
        }
        if (l.longValue() > milliSeconds) {
            log.debug("Resuming at {} (in {} ms)", l, Long.valueOf(l.longValue() - milliSeconds));
            return;
        }
        for (TopicPartition topicPartition : this.consumer.paused()) {
            Translator translator = this.translators.get(topicPartition.topic());
            if (translator == null || !translator.isDestinationReady()) {
                TopicPartition destPartition = toDestPartition(topicPartition, this.config);
                if (this.destAdminClient.partitionExists(destPartition)) {
                    log.debug("Resuming paused partition {} since partition {} now exists in the destination cluster", topicPartition, destPartition);
                    this.consumer.resume(Collections.singleton(topicPartition));
                }
            } else {
                log.debug("Resuming paused partition {} since destination is now ready", topicPartition);
                this.consumer.resume(Collections.singleton(topicPartition));
            }
        }
        if (this.consumer.paused().isEmpty()) {
            this.deadlineManager.set(DeadlineManager.DeadlineType.TRY_RESUME, null);
            return;
        }
        Long valueOf = Long.valueOf(this.deadlineManager.getMilliSeconds() + 5000);
        log.debug("Setting resume deadline to {} (in {} ms)", (Object) valueOf, (Object) 5000L);
        this.deadlineManager.set(DeadlineManager.DeadlineType.TRY_RESUME, valueOf);
    }

    private void watchTopicsForAssignedPartitions(Collection<TopicPartition> collection) {
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        for (TopicPartition topicPartition : collection) {
            Translator translator = this.translators.get(topicPartition.topic());
            if (translator != null) {
                arrayList.add(translator);
            } else {
                hashSet.add(toDestTopic(topicPartition.topic(), this.config));
            }
        }
        this.translatorMonitor.setInterestedTranslators(arrayList, () -> {
            if (this.deadlineManager.get(DeadlineManager.DeadlineType.TRY_RESUME) == null) {
                wakeupConsumer();
            }
        });
        this.destAdminClient.setInterestedTopics(hashSet, this::wakeupConsumer);
    }

    private void wakeupConsumer() {
        log.debug("Waking up consumer because there has been a change in metadata on the source topics");
        this.deadlineManager.set(DeadlineManager.DeadlineType.TRY_RESUME, Long.valueOf(this.deadlineManager.getMilliSeconds()));
        if (this.isStarting) {
            return;
        }
        this.consumer.wakeup();
    }

    private void initializeAssignedPartitions(Collection<TopicPartition> collection) {
        log.debug("Initializing source partitions for Replicator source task {}", this.taskId);
        this.offsetManager.initializePartitionOffset(collection, this.translators, this.destAdminClient);
        for (TopicPartition topicPartition : collection) {
            if (this.translators.get(topicPartition.topic()) == null && topicPartition.partition() == 0) {
                String str = topicPartition.topic();
                if (isDestTopicExpansionNeeded(str)) {
                    this.sourceTopicsNeedingExpansion.add(str);
                }
                if (this.config.getTopicConfigSync()) {
                    this.managedSourceTopics.add(str);
                }
            }
        }
    }

    public static TopicPartition toDestPartition(TopicPartition topicPartition, ReplicatorSourceTaskConfig replicatorSourceTaskConfig) {
        return new TopicPartition(toDestTopic(topicPartition.topic(), replicatorSourceTaskConfig), topicPartition.partition());
    }

    private static String toDestTopic(String str, ReplicatorSourceTaskConfig replicatorSourceTaskConfig) {
        return io.confluent.connect.replicator.util.Utils.renameTopic(replicatorSourceTaskConfig.getTopicRenameFormat(), str);
    }

    private void retryTopicExpansionIfNeeded() {
        if (this.sourceTopicsNeedingExpansion.isEmpty()) {
            this.deadlineManager.set(DeadlineManager.DeadlineType.RETRY_TOPIC_EXPANSION, null);
            log.debug("No topic expansion was needed!");
            return;
        }
        Long l = this.deadlineManager.get(DeadlineManager.DeadlineType.RETRY_TOPIC_EXPANSION);
        if (l == null || l.longValue() < this.deadlineManager.getMilliSeconds()) {
            HashSet hashSet = new HashSet();
            for (String str : this.sourceTopicsNeedingExpansion) {
                if (maybeCreateOrExpandDestTopic(str)) {
                    hashSet.add(str);
                }
            }
            log.debug("The following topics were expanded: {}", hashSet);
            this.sourceTopicsNeedingExpansion.removeAll(hashSet);
            this.deadlineManager.set(DeadlineManager.DeadlineType.RETRY_TOPIC_EXPANSION, Long.valueOf(this.deadlineManager.getMilliSeconds() + this.config.getTopicCreateBackoffMs()));
        }
    }

    private void overrideTimestampType(Properties properties) {
        properties.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, this.config.getTopicTimestampType());
    }

    private void updateTopicConfigsIfNeeded() {
        if (this.managedSourceTopics.isEmpty()) {
            this.deadlineManager.set(DeadlineManager.DeadlineType.TOPIC_CONFIG_CHECK, null);
            log.debug("No topic config update needed!");
            return;
        }
        Long l = this.deadlineManager.get(DeadlineManager.DeadlineType.TOPIC_CONFIG_CHECK);
        long milliSeconds = this.deadlineManager.getMilliSeconds();
        if (l != null && milliSeconds < l.longValue()) {
            log.debug("Checking for config update at {} (in {} ms)", Long.valueOf(milliSeconds), Long.valueOf(l.longValue() - milliSeconds));
            return;
        }
        log.debug("Verifying topic configuration for topics {} for replicator task {}", this.managedSourceTopics, this.taskId);
        for (String str : this.managedSourceTopics) {
            String destTopic = toDestTopic(str, this.config);
            if (this.destAdminClient.topicExists(destTopic)) {
                String str2 = null;
                Object obj = null;
                try {
                    Properties properties = this.sourceAdminClient.topicConfig(str);
                    overrideTimestampType(properties);
                    str2 = destTopic;
                    obj = "destination";
                    if (!properties.equals(this.destAdminClient.topicConfig(destTopic))) {
                        log.info("Updating configuration of destination topic {} with properties {}", destTopic, properties);
                        this.destAdminClient.changeTopicConfig(destTopic, properties);
                    }
                } catch (InterruptedException | RuntimeException | ExecutionException e) {
                    log.warn("Failed topic configuration check for {} topic {} on task {} exception {}. Will retry later.", obj, str2, this.taskId, e.toString());
                }
            }
        }
        this.deadlineManager.set(DeadlineManager.DeadlineType.TOPIC_CONFIG_CHECK, Long.valueOf(this.deadlineManager.getMilliSeconds() + this.config.getTopicConfigSyncIntervalMs()));
    }

    private boolean isDestTopicExpansionNeeded(String str) {
        List<PartitionInfo> partitionsFor;
        boolean topicAutoCreate = this.config.getTopicAutoCreate();
        boolean topicPreservePartitions = this.config.getTopicPreservePartitions();
        if ((!topicAutoCreate && !topicPreservePartitions) || (partitionsFor = this.consumer.partitionsFor(str)) == null) {
            return false;
        }
        int size = partitionsFor.size();
        TopicMetadata topicMetadata = this.destAdminClient.topicMetadata(toDestTopic(str, this.config));
        return topicMetadata == null ? topicAutoCreate : topicPreservePartitions && size > topicMetadata.numPartitions();
    }

    private boolean maybeCreateOrExpandDestTopic(String str) {
        int destTopicReplicationFactor;
        String destTopic = toDestTopic(str, this.config);
        List<PartitionInfo> partitionsFor = this.consumer.partitionsFor(str);
        int size = partitionsFor.size();
        log.info("Fetching destination topic metadata for destination topic " + destTopic);
        TopicMetadata topicMetadata = this.destAdminClient.topicMetadata(destTopic);
        if (!this.config.getTopicAutoCreate() || topicMetadata != null) {
            if (!this.config.getTopicPreservePartitions() || topicMetadata == null || size <= topicMetadata.numPartitions()) {
                return false;
            }
            try {
                log.info("Increasing number of partitions of topic {} from {} to {} in the destination cluster", destTopic, Integer.valueOf(topicMetadata.numPartitions()), Integer.valueOf(size));
                this.destAdminClient.addPartitions(destTopic, size);
                return true;
            } catch (InterruptedException | RuntimeException | ExecutionException e) {
                log.warn("Encountered exception when trying to increase number of partitions for destination topic {} ", destTopic, e);
                return false;
            }
        }
        try {
            log.info("Fetching source topic configs for source topic " + str);
            Properties properties = this.sourceAdminClient.topicConfig(str);
            overrideTimestampType(properties);
            boolean z = this.config.getDestTopicReplicationFactor() == 0;
            if (z) {
                destTopicReplicationFactor = partitionsFor.get(0).replicas().length;
            } else {
                destTopicReplicationFactor = this.config.getDestTopicReplicationFactor();
                log.info("Using user-specified replication factor of {} for creating destination topic {}", Integer.valueOf(destTopicReplicationFactor), destTopic);
            }
            try {
                log.debug("Determining number of alive brokers in destination cluster.");
                int aliveBrokers = this.destAdminClient.aliveBrokers();
                if (destTopicReplicationFactor > aliveBrokers) {
                    log.warn("Unable to create topic {} in the destination cluster because " + (z ? "the source replication factor " : " the specified dest.topic.replication.factor=") + "{} is greater than the number of alive brokers {}. This could be a transient issue because some brokers may go down or get disconnected at the destination cluster. If the number of brokers deployed at the destination cluster is greater than the replication factor, the system will fix by itself. Otherwise, the issue can be fixed by adding additional brokers to the destination cluster or setting '{}' flag in replicator's config to meet the number of brokers at the destination cluster.", destTopic, Integer.valueOf(destTopicReplicationFactor), Integer.valueOf(aliveBrokers), ReplicatorSourceConnectorConfig.DST_TOPIC_REPLICATION_FACTOR);
                    return false;
                }
                try {
                    log.info("Creating topic {} in destination cluster with {} partitions and replication factor {}", destTopic, Integer.valueOf(size), Integer.valueOf(destTopicReplicationFactor));
                    return this.destAdminClient.createTopic(destTopic, size, (short) destTopicReplicationFactor, properties);
                } catch (InterruptedException | RuntimeException | ExecutionException e2) {
                    log.warn("Encountered exception when trying to create destination topic {}: ", destTopic, e2);
                    return false;
                }
            } catch (InterruptedException | RuntimeException | ExecutionException e3) {
                log.warn("Encountered exception when trying to determine number of alive brokers in destination cluster. Unable to create destination topic {} at this time: ", destTopic, e3);
                return false;
            }
        } catch (InterruptedException | RuntimeException | ExecutionException e4) {
            log.warn("Encountered exception when trying to fetch source topic configs. Unable to create destination topic {} at this time: ", destTopic, e4);
            return false;
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) throws InterruptedException {
        if (recordMetadata != null) {
            log.trace("Committing source record with record metadata {}", recordMetadata.toString());
        }
        if (this.timestampsCommitter != null) {
            if (recordMetadata != null) {
                log.trace("Committing consumer timestamps with record metadata {}", recordMetadata.toString());
            }
            this.timestampsCommitter.commitRecord(sourceRecord);
        }
        if (this.offsetTopicCommitter != null) {
            if (recordMetadata != null) {
                log.trace("Committing offsets with record metadata {}", recordMetadata.toString());
            }
            this.offsetTopicCommitter.commitRecord(sourceRecord, recordMetadata);
        }
        if (sourceRecord != null) {
            this.replicatorTaskMetricsGroup.recordTaskTopicPartitionMetrics(sourceRecord, recordMetadata);
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public void commit() throws InterruptedException {
        if (this.timestampsCommitter != null) {
            this.timestampsCommitter.commit();
        }
        if (this.offsetTopicCommitter != null) {
            this.offsetTopicCommitter.commit();
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public void stop() {
        log.info("Closing kafka replicator task {}", this.taskId);
        if (this.consumer != null) {
            this.consumer.wakeup();
            synchronized (this) {
                AtomicReference atomicReference = new AtomicReference();
                log.debug("Closing source admin client");
                io.confluent.connect.replicator.util.Utils.closeQuietly(this.sourceAdminClient, "source admin client", atomicReference);
                log.debug("Closing destination admin client");
                io.confluent.connect.replicator.util.Utils.closeQuietly(this.destAdminClient, "destination admin client", atomicReference);
                log.debug("Closing consumer");
                io.confluent.connect.replicator.util.Utils.closeQuietly(this.consumer, ConsumerProtocol.PROTOCOL_TYPE, atomicReference);
                if (this.timestampsCommitter != null) {
                    log.debug("Stopping timestamps committer");
                    this.timestampsCommitter.stop();
                }
                if (this.translatorMonitor != null) {
                    log.debug("Closing translator monitor");
                    this.translatorMonitor.close();
                }
            }
        }
        log.info("Shutting down metrics recording for task {}", this.taskId);
        if (this.replicatorTaskMetricsGroup != null) {
            this.replicatorTaskMetricsGroup.stopMetrics();
        }
    }

    private Consumer<byte[], byte[]> buildSourceConsumer(ReplicatorSourceTaskConfig replicatorSourceTaskConfig) {
        return this.consumer != null ? this.consumer : createConsumerHelper(replicatorSourceTaskConfig.getSourceConsumerConfigs(), replicatorSourceTaskConfig.getName(), this.taskId);
    }

    @NotNull
    public static Consumer<byte[], byte[]> createConsumerHelper(Map<String, ?> map, String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        if (!hashMap.containsKey("group.id")) {
            hashMap.put("group.id", str);
        }
        if (!hashMap.containsKey("client.id")) {
            hashMap.put("client.id", str2);
        }
        hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
        hashMap.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
        log.debug("Initializing consumer with client id {} and group id {}", hashMap.get("client.id"), hashMap.get("group.id"));
        log.debug("Initializing Replicator Task Connector in Group");
        return new KafkaConsumer(hashMap, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    private ReplicatorSourceTaskConfig taskConfig(Map<String, String> map) {
        return this.config != null ? this.config : new ReplicatorSourceTaskConfig(map);
    }

    private ReplicatorAdminClient createAdminClient(ReplicatorAdminClient replicatorAdminClient, Supplier<Map<String, Object>> supplier) {
        if (replicatorAdminClient != null) {
            return replicatorAdminClient;
        }
        return new NewReplicatorAdminClient(supplier.get(), this.deadlineManager.getTime(), 30000L, this.taskId);
    }

    @VisibleForTesting
    protected ReplicatorAdminClient getSourceAdminClient() {
        return this.sourceAdminClient;
    }

    @VisibleForTesting
    protected ReplicatorAdminClient getDestinationAdminClient() {
        return this.destAdminClient;
    }
}
