package com.amazonaws.services.kinesis.connectors.s3;

import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration;
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer;
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.beust.jcommander.Parameters;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/amazonaws/services/kinesis/connectors/s3/S3Emitter.class */
public class S3Emitter implements IEmitter<byte[]> {
    private static final Log LOG = LogFactory.getLog(S3Emitter.class);
    protected final String s3Bucket;
    protected final String s3Endpoint;
    protected final AmazonS3Client s3client;

    public S3Emitter(KinesisConnectorConfiguration kinesisConnectorConfiguration) {
        this.s3Bucket = kinesisConnectorConfiguration.S3_BUCKET;
        this.s3Endpoint = kinesisConnectorConfiguration.S3_ENDPOINT;
        this.s3client = new AmazonS3Client(kinesisConnectorConfiguration.AWS_CREDENTIALS_PROVIDER);
        if (this.s3Endpoint != null) {
            this.s3client.setEndpoint(this.s3Endpoint);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getS3FileName(String str, String str2) {
        return str + Parameters.DEFAULT_OPTION_PREFIXES + str2;
    }

    protected String getS3URI(String str) {
        return "s3://" + this.s3Bucket + "/" + str;
    }

    @Override // com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
    public List<byte[]> emit(UnmodifiableBuffer<byte[]> unmodifiableBuffer) throws IOException {
        List<byte[]> records = unmodifiableBuffer.getRecords();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        for (byte[] bArr : records) {
            try {
                byteArrayOutputStream.write(bArr);
            } catch (Exception e) {
                LOG.error("Error writing record to output stream. Failing this emit attempt. Record: " + Arrays.toString(bArr), e);
                return unmodifiableBuffer.getRecords();
            }
        }
        String s3FileName = getS3FileName(unmodifiableBuffer.getFirstSequenceNumber(), unmodifiableBuffer.getLastSequenceNumber());
        String s3uri = getS3URI(s3FileName);
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
            LOG.debug("Starting upload of file " + s3uri + " to Amazon S3 containing " + records.size() + " records.");
            ObjectMetadata objectMetadata = new ObjectMetadata();
            objectMetadata.setContentLength(byteArrayOutputStream.size());
            this.s3client.putObject(this.s3Bucket, s3FileName, byteArrayInputStream, objectMetadata);
            LOG.info("Successfully emitted " + unmodifiableBuffer.getRecords().size() + " records to Amazon S3 in " + s3uri);
            return Collections.emptyList();
        } catch (Exception e2) {
            LOG.error("Caught exception when uploading file " + s3uri + "to Amazon S3. Failing this emit attempt.", e2);
            return unmodifiableBuffer.getRecords();
        }
    }

    @Override // com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
    public void fail(List<byte[]> list) {
        Iterator<byte[]> it = list.iterator();
        while (it.hasNext()) {
            LOG.error("Record failed: " + Arrays.toString(it.next()));
        }
    }

    @Override // com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
    public void shutdown() {
        this.s3client.shutdown();
    }
}
