package com.spredfast.kafka.connect.s3.source;

import com.spredfast.kafka.connect.s3.AlreadyBytesConverter;
import com.spredfast.kafka.connect.s3.Configure;
import com.spredfast.kafka.connect.s3.Constants;
import com.spredfast.kafka.connect.s3.S3;
import com.spredfast.kafka.connect.s3.S3RecordFormat;
import com.spredfast.kafka.connect.s3.source.S3FilesReader;
import com.spredfast.shade.amazonaws.services.s3.AmazonS3;
import com.spredfast.shade.amazonaws.services.s3.model.AmazonS3Exception;
import com.spredfast.shade.amazonaws.util.StringUtils;
import com.spredfast.shade.fasterxml.jackson.annotation.JsonProperty;
import com.spredfast.shade.joda.time.DateTimeConstants;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spredfast/kafka/connect/s3/source/S3SourceTask.class */
public class S3SourceTask extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(S3SourceTask.class);
    public static final String CONFIG_TARGET_TOPIC = "targetTopic";
    private Map<String, String> taskConfig;
    private Iterator<S3SourceRecord> reader;
    private int maxPoll;
    private S3RecordFormat format;
    private Optional<Converter> keyConverter;
    private Converter valueConverter;
    private Map<S3Partition, S3Offset> offsets;
    private final AtomicBoolean stopped = new AtomicBoolean();
    private final Map<String, String> topicMapping = new HashMap();
    private long s3PollInterval = 10000;
    private long errorBackoff = 1000;

    public String version() {
        return Constants.VERSION;
    }

    public void start(Map<String, String> map) {
        this.taskConfig = map;
        this.format = Configure.createFormat(map);
        this.keyConverter = Optional.ofNullable(Configure.buildConverter(map, "key.converter", true, null));
        this.valueConverter = Configure.buildConverter(map, "value.converter", false, AlreadyBytesConverter.class);
        readFromStoredOffsets();
    }

    private void readFromStoredOffsets() {
        try {
            tryReadFromStoredOffsets();
        } catch (Exception e) {
            throw new ConnectException("Couldn't start task " + this.taskConfig, e);
        }
    }

    private void tryReadFromStoredOffsets() throws UnsupportedEncodingException {
        String orElseThrow = configGet("s3.bucket").orElseThrow(() -> {
            return new ConnectException("No bucket configured!");
        });
        String orElse = configGet("s3.prefix").orElse(JsonProperty.USE_DEFAULT_NAME);
        Set set = (Set) Arrays.stream(configGet("partitions").orElseThrow(() -> {
            return new IllegalStateException("no assigned parititions!?");
        }).split(StringUtils.COMMA_SEPARATOR)).map(Integer::parseInt).collect(Collectors.toSet());
        Set set2 = (Set) configGet("topics").map((v0) -> {
            return v0.toString();
        }).map(str -> {
            return (Set) Arrays.stream(str.split(StringUtils.COMMA_SEPARATOR)).collect(Collectors.toSet());
        }).orElseGet(HashSet::new);
        List list = (List) set.stream().flatMap(num -> {
            return set2.stream().map(str2 -> {
                return S3Partition.from(orElseThrow, orElse, str2, num.intValue());
            });
        }).collect(Collectors.toList());
        if (this.offsets == null) {
            this.offsets = (Map) this.context.offsetStorageReader().offsets((Collection) list.stream().map((v0) -> {
                return v0.asMap();
            }).collect(Collectors.toList())).entrySet().stream().filter(entry -> {
                return entry.getValue() != null;
            }).collect(Collectors.toMap(entry2 -> {
                return S3Partition.from((Map) entry2.getKey());
            }, entry3 -> {
                return S3Offset.from((Map) entry3.getValue());
            }));
        }
        this.maxPoll = ((Integer) configGet("max.poll.records").map(Integer::parseInt).orElse(Integer.valueOf(DateTimeConstants.MILLIS_PER_SECOND))).intValue();
        this.s3PollInterval = ((Long) configGet("s3.new.record.poll.interval").map(Long::parseLong).orElse(10000L)).longValue();
        this.errorBackoff = ((Long) configGet("s3.error.backoff").map(Long::parseLong).orElse(1000L)).longValue();
        AmazonS3 s3client = S3.s3client(this.taskConfig);
        S3SourceConfig s3SourceConfig = new S3SourceConfig(orElseThrow, orElse, ((Integer) configGet("s3.page.size").map(Integer::parseInt).orElse(100)).intValue(), configGet("s3.start.marker").orElse(null), S3FilesReader.DEFAULT_PATTERN, S3FilesReader.InputFilter.GUNZIP, S3FilesReader.PartitionFilter.from((str2, num2) -> {
            return (set2.isEmpty() || set2.contains(str2)) && set.contains(num2);
        }));
        log.debug("{} reading from S3 with offsets {}", name(), this.offsets);
        Map<S3Partition, S3Offset> map = this.offsets;
        S3RecordFormat s3RecordFormat = this.format;
        s3RecordFormat.getClass();
        this.reader = new S3FilesReader(s3SourceConfig, s3client, map, s3RecordFormat::newReader).readAll();
    }

    private Optional<String> configGet(String str) {
        return Optional.ofNullable(this.taskConfig.get(str));
    }

    public List<SourceRecord> poll() throws InterruptedException {
        ArrayList arrayList = new ArrayList(this.maxPoll);
        if (this.stopped.get()) {
            return arrayList;
        }
        while (!this.stopped.get()) {
            try {
                return getSourceRecords(arrayList);
            } catch (AmazonS3Exception e) {
                if (!e.isRetryable()) {
                    throw e;
                }
                log.warn("Retryable error while polling. Will sleep and try again.", e);
                Thread.sleep(this.errorBackoff);
                readFromStoredOffsets();
            }
        }
        return arrayList;
    }

    private List<SourceRecord> getSourceRecords(List<SourceRecord> list) throws InterruptedException {
        while (!this.reader.hasNext() && !this.stopped.get()) {
            log.debug("Blocking until new S3 files are available.");
            Thread.sleep(this.s3PollInterval);
            readFromStoredOffsets();
        }
        if (this.stopped.get()) {
            return list;
        }
        for (int i = 0; this.reader.hasNext() && i < this.maxPoll && !this.stopped.get(); i++) {
            S3SourceRecord next = this.reader.next();
            updateOffsets(next.file(), next.offset());
            String computeIfAbsent = this.topicMapping.computeIfAbsent(next.topic(), this::remapTopic);
            Optional<U> map = this.keyConverter.map(converter -> {
                return converter.toConnectData(computeIfAbsent, next.key());
            });
            SchemaAndValue connectData = this.valueConverter.toConnectData(computeIfAbsent, next.value());
            list.add(new SourceRecord(next.file().asMap(), next.offset().asMap(), computeIfAbsent, Integer.valueOf(next.partition()), (Schema) map.map((v0) -> {
                return v0.schema();
            }).orElse(null), map.map((v0) -> {
                return v0.value();
            }).orElse(null), connectData.schema(), connectData.value()));
        }
        log.debug("{} returning {} records.", name(), Integer.valueOf(list.size()));
        return list;
    }

    private void updateOffsets(S3Partition s3Partition, S3Offset s3Offset) {
        S3Offset orDefault = this.offsets.getOrDefault(s3Partition, s3Offset);
        if (orDefault.compareTo(s3Offset) >= 0) {
            this.offsets.put(s3Partition, orDefault);
        } else {
            log.debug("{} updated offset for {} to {}", new Object[]{name(), s3Partition, s3Offset});
            this.offsets.put(s3Partition, s3Offset);
        }
    }

    public void commit() throws InterruptedException {
        log.debug("{} Commit offsets {}", name(), this.offsets);
    }

    public void commitRecord(SourceRecord sourceRecord) throws InterruptedException {
        log.debug("{} Commit record w/ offset {}", name(), sourceRecord.sourceOffset());
    }

    private String name() {
        return configGet("name").orElse("???");
    }

    private String remapTopic(String str) {
        return this.taskConfig.getOrDefault("targetTopic." + str, str);
    }

    public void stop() {
        this.stopped.set(true);
    }
}
