package com.spredfast.kafka.connect.s3.sink;

import com.spredfast.kafka.connect.s3.AlreadyBytesConverter;
import com.spredfast.kafka.connect.s3.Configure;
import com.spredfast.kafka.connect.s3.Constants;
import com.spredfast.kafka.connect.s3.Metrics;
import com.spredfast.kafka.connect.s3.S3;
import com.spredfast.kafka.connect.s3.S3RecordFormat;
import com.spredfast.kafka.connect.s3.S3RecordsWriter;
import com.spredfast.shade.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spredfast/kafka/connect/s3/sink/S3SinkTask.class */
public class S3SinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(S3SinkTask.class);
    private Map<String, String> config;
    private final Map<TopicPartition, PartitionWriter> partitions = new LinkedHashMap();
    private long GZIPChunkThreshold = 67108864;
    private S3Writer s3;
    private Optional<Converter> keyConverter;
    private Converter valueConverter;
    private S3RecordFormat recordFormat;
    private Metrics metrics;
    private Map<String, String> tags;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spredfast/kafka/connect/s3/sink/S3SinkTask$PartitionWriter.class */
    public class PartitionWriter {
        private final TopicPartition tp;
        private final BlockGZIPFileWriter writer;
        private final S3RecordsWriter format;
        private final Map<String, String> tags;
        private boolean finished;
        private boolean closed;

        private PartitionWriter(TopicPartition topicPartition, long j) throws IOException {
            this.tp = topicPartition;
            this.format = S3SinkTask.this.recordFormat.newWriter();
            String format = String.format("%s-%05d", topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
            String str = (String) S3SinkTask.this.configGet("local.buffer.dir").orElseThrow(() -> {
                return new ConnectException("No local buffer file path configured");
            });
            S3SinkTask.log.debug("New temp file: {}/{} @ {}", new Object[]{str, format, Long.valueOf(j)});
            HashMap hashMap = new HashMap(S3SinkTask.this.tags);
            hashMap.put("kafka_topic", topicPartition.topic());
            hashMap.put("kafka_partition", JsonProperty.USE_DEFAULT_NAME + topicPartition.partition());
            this.tags = hashMap;
            this.writer = new BlockGZIPFileWriter(format, str, j, S3SinkTask.this.GZIPChunkThreshold, this.format.init(topicPartition.topic(), topicPartition.partition(), j));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void writeAll(Collection<SinkRecord> collection) {
            S3SinkTask.this.metrics.hist(collection.size(), "putSize", this.tags);
            try {
                Metrics.StopTimer time = S3SinkTask.this.metrics.time("writeAll", this.tags);
                Throwable th = null;
                try {
                    try {
                        this.writer.write((List) this.format.writeBatch(collection.stream().map(sinkRecord -> {
                            return new ProducerRecord(sinkRecord.topic(), sinkRecord.kafkaPartition(), S3SinkTask.this.keyConverter.map(converter -> {
                                return converter.fromConnectData(sinkRecord.topic(), sinkRecord.keySchema(), sinkRecord.key());
                            }).orElse(null), S3SinkTask.this.valueConverter.fromConnectData(sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value()));
                        })).collect(Collectors.toList()), collection.size());
                        if (time != null) {
                            if (0 != 0) {
                                try {
                                    time.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                time.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new RetriableException("Failed to write to buffer", e);
            }
        }

        public String getDataFilePath() {
            return this.writer.getDataFilePath();
        }

        public void delete() {
            this.writer.delete();
            S3SinkTask.this.partitions.remove(this.tp);
        }

        /* JADX WARN: Type inference failed for: r1v7, types: [byte[], java.lang.Object[]] */
        public void done() {
            Metrics.StopTimer time = S3SinkTask.this.metrics.time("s3Put", this.tags);
            try {
                if (!this.finished) {
                    this.writer.write(Arrays.asList(new byte[]{this.format.finish(this.tp.topic(), this.tp.partition())}), 0L);
                    this.finished = true;
                }
                if (!this.closed) {
                    this.writer.close();
                    this.closed = true;
                }
                S3SinkTask.this.s3.putChunk(this.writer.getDataFilePath(), this.writer.getIndexFilePath(), this.tp);
                delete();
                time.stop();
            } catch (IOException e) {
                throw new RetriableException("Error flushing " + this.tp, e);
            }
        }
    }

    public String version() {
        return Constants.VERSION;
    }

    public void start(Map<String, String> map) throws ConnectException {
        this.config = new HashMap(map);
        configGet("compressed_block_size").map(Long::parseLong).ifPresent(l -> {
            this.GZIPChunkThreshold = l.longValue();
        });
        this.recordFormat = Configure.createFormat(map);
        this.keyConverter = Optional.ofNullable(Configure.buildConverter(this.config, "key.converter", true, null));
        this.valueConverter = Configure.buildConverter(this.config, "value.converter", false, AlreadyBytesConverter.class);
        this.s3 = new S3Writer(configGet("s3.bucket").filter(str -> {
            return !str.isEmpty();
        }).orElseThrow(() -> {
            return new ConnectException("S3 bucket must be configured");
        }), configGet("s3.prefix").orElse(JsonProperty.USE_DEFAULT_NAME), S3.s3client(this.config));
        this.metrics = Configure.metrics(map);
        this.tags = Configure.parseTags(map.get("metrics.tags"));
        this.tags.put("connector_name", name());
        open(this.context.assignment());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<String> configGet(String str) {
        return Optional.ofNullable(this.config.get(str));
    }

    public void stop() throws ConnectException {
        for (PartitionWriter partitionWriter : this.partitions.values()) {
            log.debug("{} Stopping - Deleting temp file {}", name(), partitionWriter.getDataFilePath());
            partitionWriter.delete();
        }
    }

    public void put(Collection<SinkRecord> collection) throws ConnectException {
        ((Map) collection.stream().collect(Collectors.groupingBy(sinkRecord -> {
            return new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue());
        }))).forEach((topicPartition, list) -> {
            long kafkaOffset = ((SinkRecord) list.get(0)).kafkaOffset();
            log.debug("{} received {} records for {} to archive. Last offset {}", new Object[]{name(), Integer.valueOf(list.size()), topicPartition, Long.valueOf(((SinkRecord) list.get(list.size() - 1)).kafkaOffset())});
            this.partitions.computeIfAbsent(topicPartition, topicPartition -> {
                return initWriter(topicPartition, kafkaOffset);
            }).writeAll(list);
        });
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) throws ConnectException {
        Metrics.StopTimer time = this.metrics.time("flush", this.tags);
        log.debug("{} flushing offsets", name());
        Stream<TopicPartition> stream = map.keySet().stream();
        Map<TopicPartition, PartitionWriter> map2 = this.partitions;
        map2.getClass();
        stream.map((v1) -> {
            return r1.get(v1);
        }).filter(partitionWriter -> {
            return partitionWriter != null;
        }).forEach((v0) -> {
            v0.done();
        });
        time.stop();
    }

    private String name() {
        return configGet("name").orElseThrow(() -> {
            return new IllegalWorkerStateException("Tasks always have names");
        });
    }

    public void close(Collection<TopicPartition> collection) {
        Stream<TopicPartition> stream = collection.stream();
        Map<TopicPartition, PartitionWriter> map = this.partitions;
        map.getClass();
        stream.map((v1) -> {
            return r1.get(v1);
        }).filter(partitionWriter -> {
            return partitionWriter != null;
        }).forEach((v0) -> {
            v0.delete();
        });
    }

    public void open(Collection<TopicPartition> collection) {
    }

    private PartitionWriter initWriter(TopicPartition topicPartition, long j) {
        try {
            return new PartitionWriter(topicPartition, j);
        } catch (IOException e) {
            throw new RetriableException("Error initializing writer for " + topicPartition + " at offset " + j, e);
        }
    }
}
