package com.spredfast.kafka.connect.s3.source;

import com.spredfast.kafka.connect.s3.LazyString;
import com.spredfast.kafka.connect.s3.S3RecordsReader;
import com.spredfast.kafka.connect.s3.json.ChunkDescriptor;
import com.spredfast.kafka.connect.s3.json.ChunksIndex;
import com.spredfast.shade.amazonaws.AmazonClientException;
import com.spredfast.shade.amazonaws.services.s3.AmazonS3;
import com.spredfast.shade.amazonaws.services.s3.model.GetObjectRequest;
import com.spredfast.shade.amazonaws.services.s3.model.ListObjectsRequest;
import com.spredfast.shade.amazonaws.services.s3.model.ObjectListing;
import com.spredfast.shade.amazonaws.services.s3.model.S3Object;
import com.spredfast.shade.amazonaws.services.s3.model.S3ObjectSummary;
import com.spredfast.shade.fasterxml.jackson.databind.ObjectMapper;
import com.spredfast.shade.fasterxml.jackson.databind.ObjectReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spredfast/kafka/connect/s3/source/S3FilesReader.class */
public class S3FilesReader implements Iterable<S3SourceRecord> {
    private final AmazonS3 s3Client;
    private final Supplier<S3RecordsReader> makeReader;
    private final Map<S3Partition, S3Offset> offsets;
    private final ObjectReader indexParser = new ObjectMapper().reader(ChunksIndex.class);
    private final S3SourceConfig config;
    private static final Logger log = LoggerFactory.getLogger(S3FilesReader.class);
    public static final Pattern DEFAULT_PATTERN = Pattern.compile("(\\/|^)(?<topic>[^/]+?)-(?<partition>\\d{5})-(?<offset>\\d{12})\\.gz$");
    private static final Pattern DATA_SUFFIX = Pattern.compile("\\.gz$");

    /* loaded from: input_file:com/spredfast/kafka/connect/s3/source/S3FilesReader$InputFilter.class */
    public interface InputFilter {
        public static final InputFilter GUNZIP = GZIPInputStream::new;

        InputStream filter(InputStream inputStream) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spredfast/kafka/connect/s3/source/S3FilesReader$KeyConsumer.class */
    public interface KeyConsumer<T> {
        T consume(String str, int i, long j) throws IOException;
    }

    /* loaded from: input_file:com/spredfast/kafka/connect/s3/source/S3FilesReader$PartitionFilter.class */
    public interface PartitionFilter {
        public static final PartitionFilter MATCH_ALL = i -> {
            return true;
        };

        boolean matches(int i);

        default boolean matches(String str, int i) {
            return matches(i);
        }

        static PartitionFilter from(final BiPredicate<String, Integer> biPredicate) {
            return new PartitionFilter() { // from class: com.spredfast.kafka.connect.s3.source.S3FilesReader.PartitionFilter.1
                @Override // com.spredfast.kafka.connect.s3.source.S3FilesReader.PartitionFilter
                public boolean matches(int i) {
                    throw new UnsupportedOperationException();
                }

                @Override // com.spredfast.kafka.connect.s3.source.S3FilesReader.PartitionFilter
                public boolean matches(String str, int i) {
                    return biPredicate.test(str, Integer.valueOf(i));
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spredfast/kafka/connect/s3/source/S3FilesReader$QuietKeyConsumer.class */
    public interface QuietKeyConsumer<T> {
        T consume(String str, int i, long j);
    }

    public S3FilesReader(S3SourceConfig s3SourceConfig, AmazonS3 amazonS3, Map<S3Partition, S3Offset> map, Supplier<S3RecordsReader> supplier) {
        this.config = s3SourceConfig;
        this.offsets = (Map) Optional.ofNullable(map).orElseGet(HashMap::new);
        this.s3Client = amazonS3;
        this.makeReader = supplier;
    }

    @Override // java.lang.Iterable
    public Iterator<S3SourceRecord> iterator() {
        return readAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int partition(String str) {
        Matcher matcher = this.config.keyPattern.matcher(str);
        if (matcher.find()) {
            return Integer.parseInt(matcher.group("partition"));
        }
        throw new IllegalArgumentException("Not a valid chunk filename! " + str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String topic(String str) {
        Matcher matcher = this.config.keyPattern.matcher(str);
        if (matcher.find()) {
            return matcher.group("topic");
        }
        throw new IllegalArgumentException("Not a valid chunk filename! " + str);
    }

    public Iterator<S3SourceRecord> readAll() {
        return new Iterator<S3SourceRecord>() { // from class: com.spredfast.kafka.connect.s3.source.S3FilesReader.1
            String currentKey;
            ObjectListing objectListing;
            Iterator<S3ObjectSummary> nextFile = Collections.emptyIterator();
            Iterator<ConsumerRecord<byte[], byte[]>> iterator = Collections.emptyIterator();

            private void nextObject() {
                while (!this.nextFile.hasNext() && hasMoreObjects()) {
                    if (this.objectListing == null) {
                        this.objectListing = S3FilesReader.this.s3Client.listObjects(new ListObjectsRequest(S3FilesReader.this.config.bucket, S3FilesReader.this.config.keyPrefix, S3FilesReader.this.config.startMarker, null, Integer.valueOf(S3FilesReader.this.config.pageSize * 2)));
                        S3FilesReader.log.debug("aws ls {}/{} after:{} = {}", new Object[]{S3FilesReader.this.config.bucket, S3FilesReader.this.config.keyPrefix, S3FilesReader.this.config.startMarker, LazyString.of(() -> {
                            return (List) this.objectListing.getObjectSummaries().stream().map((v0) -> {
                                return v0.getKey();
                            }).collect(Collectors.toList());
                        })});
                    } else {
                        String nextMarker = this.objectListing.getNextMarker();
                        this.objectListing = S3FilesReader.this.s3Client.listNextBatchOfObjects(this.objectListing);
                        S3FilesReader.log.debug("aws ls {}/{} after:{} = {}", new Object[]{S3FilesReader.this.config.bucket, S3FilesReader.this.config.keyPrefix, nextMarker, LazyString.of(() -> {
                            return (List) this.objectListing.getObjectSummaries().stream().map((v0) -> {
                                return v0.getKey();
                            }).collect(Collectors.toList());
                        })});
                    }
                    ArrayList arrayList = new ArrayList(this.objectListing.getObjectSummaries().size() / 2);
                    for (S3ObjectSummary s3ObjectSummary : this.objectListing.getObjectSummaries()) {
                        if (S3FilesReader.DATA_SUFFIX.matcher(s3ObjectSummary.getKey()).find() && ((Boolean) S3FilesReader.this.parseKeyUnchecked(s3ObjectSummary.getKey(), (str, i, j) -> {
                            return Boolean.valueOf(S3FilesReader.this.config.partitionFilter.matches(str, i));
                        })).booleanValue()) {
                            S3Offset offset = offset(s3ObjectSummary);
                            if (offset == null || offset.getS3key().compareTo(s3ObjectSummary.getKey()) <= 0) {
                                arrayList.add(s3ObjectSummary);
                            } else {
                                S3FilesReader.log.debug("Skipping {} because < current offset of {}", s3ObjectSummary.getKey(), offset);
                            }
                        }
                    }
                    S3FilesReader.log.debug("Next Chunks: {}", LazyString.of(() -> {
                        return (List) arrayList.stream().map((v0) -> {
                            return v0.getKey();
                        }).collect(Collectors.toList());
                    }));
                    this.nextFile = arrayList.iterator();
                }
                if (!this.nextFile.hasNext()) {
                    this.iterator = Collections.emptyIterator();
                    return;
                }
                try {
                    S3ObjectSummary next = this.nextFile.next();
                    this.currentKey = next.getKey();
                    S3Offset offset2 = offset(next);
                    if (offset2 == null || !offset2.getS3key().equals(this.currentKey)) {
                        S3FilesReader.log.debug("Now reading from {}", this.currentKey);
                        S3RecordsReader s3RecordsReader = (S3RecordsReader) S3FilesReader.this.makeReader.get();
                        InputStream content = getContent(S3FilesReader.this.s3Client.getObject(S3FilesReader.this.config.bucket, this.currentKey));
                        this.iterator = (Iterator) S3FilesReader.this.parseKey(this.currentKey, (str2, i2, j2) -> {
                            s3RecordsReader.init(str2, i2, content, j2);
                            return s3RecordsReader.readAll(str2, i2, content, j2);
                        });
                    } else {
                        resumeFromOffset(offset2);
                    }
                } catch (IOException e) {
                    throw new AmazonClientException(e);
                }
            }

            private InputStream getContent(S3Object s3Object) throws IOException {
                return S3FilesReader.this.config.inputFilter.filter(s3Object.getObjectContent());
            }

            private S3Offset offset(S3ObjectSummary s3ObjectSummary) {
                return (S3Offset) S3FilesReader.this.offsets.get(S3Partition.from(S3FilesReader.this.config.bucket, S3FilesReader.this.config.keyPrefix, S3FilesReader.this.topic(s3ObjectSummary.getKey()), S3FilesReader.this.partition(s3ObjectSummary.getKey())));
            }

            private void resumeFromOffset(S3Offset s3Offset) throws IOException {
                S3FilesReader.log.debug("resumeFromOffset {}", s3Offset);
                S3RecordsReader s3RecordsReader = (S3RecordsReader) S3FilesReader.this.makeReader.get();
                ChunksIndex chunksIndex = S3FilesReader.this.getChunksIndex(s3Offset.getS3key());
                ChunkDescriptor orElse = chunksIndex.chunkContaining(s3Offset.getOffset() + 1).orElse(null);
                if (orElse == null) {
                    S3FilesReader.log.warn("Missing chunk descriptor for requested offset {} (max:{}). Moving on to next file.", s3Offset, Long.valueOf(chunksIndex.lastOffset()));
                    nextObject();
                    return;
                }
                if (s3RecordsReader.isInitRequired() && orElse.byte_offset > 0) {
                    S3Object object = S3FilesReader.this.s3Client.getObject(new GetObjectRequest(S3FilesReader.this.config.bucket, s3Offset.getS3key()));
                    Throwable th = null;
                    try {
                        try {
                            S3FilesReader.this.parseKey(object.getKey(), (str, i, j) -> {
                                s3RecordsReader.init(str, i, getContent(object), j);
                                return null;
                            });
                            if (object != null) {
                                if (0 != 0) {
                                    try {
                                        object.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    object.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (object != null) {
                            if (th != null) {
                                try {
                                    object.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                object.close();
                            }
                        }
                        throw th3;
                    }
                }
                GetObjectRequest getObjectRequest = new GetObjectRequest(S3FilesReader.this.config.bucket, s3Offset.getS3key());
                getObjectRequest.setRange(orElse.byte_offset, chunksIndex.totalSize());
                S3Object object2 = S3FilesReader.this.s3Client.getObject(getObjectRequest);
                this.currentKey = object2.getKey();
                S3FilesReader.log.debug("Resume {}: Now reading from {}, reading {}-{}", new Object[]{s3Offset, this.currentKey, Long.valueOf(orElse.byte_offset), Long.valueOf(chunksIndex.totalSize())});
                this.iterator = (Iterator) S3FilesReader.this.parseKey(object2.getKey(), (str2, i2, j2) -> {
                    return s3RecordsReader.readAll(str2, i2, getContent(object2), orElse.first_record_offset);
                });
                long offset = (s3Offset.getOffset() - orElse.first_record_offset) + 1;
                for (int i3 = 0; i3 < offset; i3++) {
                    this.iterator.next();
                }
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                while (!this.iterator.hasNext() && hasMoreObjects()) {
                    nextObject();
                }
                return this.iterator.hasNext();
            }

            boolean hasMoreObjects() {
                return this.objectListing == null || this.objectListing.isTruncated() || this.nextFile.hasNext();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public S3SourceRecord next() {
                ConsumerRecord<byte[], byte[]> next = this.iterator.next();
                return new S3SourceRecord(S3Partition.from(S3FilesReader.this.config.bucket, S3FilesReader.this.config.keyPrefix, next.topic(), next.partition()), S3Offset.from(this.currentKey, next.offset()), next.topic(), next.partition(), (byte[]) next.key(), (byte[]) next.value());
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> T parseKeyUnchecked(String str, QuietKeyConsumer<T> quietKeyConsumer) {
        try {
            quietKeyConsumer.getClass();
            return (T) parseKey(str, quietKeyConsumer::consume);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> T parseKey(String str, KeyConsumer<T> keyConsumer) throws IOException {
        Matcher matcher = this.config.keyPattern.matcher(str);
        if (matcher.find()) {
            return keyConsumer.consume(matcher.group("topic"), Integer.parseInt(matcher.group("partition")), Long.parseLong(matcher.group("offset")));
        }
        throw new IllegalArgumentException("Not a valid chunk filename! " + str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChunksIndex getChunksIndex(String str) throws IOException {
        return (ChunksIndex) this.indexParser.readValue(new InputStreamReader(this.s3Client.getObject(this.config.bucket, DATA_SUFFIX.matcher(str).replaceAll(".index.json")).getObjectContent()));
    }
}
