/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.store;

import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import java.io.File;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.AutoAbortingS3InputStream;
import kafka.tier.store.S3TierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.TierObjectStoreUtils;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3TierObjectStore
implements TierObjectStore {
    private static final Logger log = LoggerFactory.getLogger(S3TierObjectStore.class);
    private final String clusterId;
    private final int brokerId;
    private final String bucket;
    private final String sseAlgorithm;
    private final int partUploadSize;
    private final int autoAbortThresholdBytes;
    private AmazonS3 client;

    public S3TierObjectStore(S3TierObjectStoreConfig config) {
        this(S3TierObjectStore.client(config), config);
    }

    S3TierObjectStore(AmazonS3 client, S3TierObjectStoreConfig config) {
        this.clusterId = config.clusterId;
        this.brokerId = config.brokerId;
        this.client = client;
        this.bucket = config.s3bucket;
        this.sseAlgorithm = config.s3SseAlgorithm;
        this.partUploadSize = config.s3MultipartUploadSize;
        this.autoAbortThresholdBytes = config.s3AutoAbortThresholdBytes;
        this.expectBucket(this.bucket, config.s3Region);
    }

    @Override
    public TierObjectStoreResponse getObject(TierObjectStore.ObjectMetadata objectMetadata, TierObjectStore.FileType fileType, Integer byteOffsetStart, Integer byteOffsetEnd) {
        S3Object object;
        String key = TierObjectStoreUtils.keyPath(objectMetadata, fileType);
        GetObjectRequest request = new GetObjectRequest(this.bucket, key);
        if (byteOffsetStart != null && byteOffsetEnd != null) {
            request.setRange((long)byteOffsetStart.intValue(), (long)byteOffsetEnd.intValue());
        } else if (byteOffsetStart != null) {
            request.setRange((long)byteOffsetStart.intValue());
        } else if (byteOffsetEnd != null) {
            throw new IllegalStateException("Cannot specify a byteOffsetEnd without specifying a byteOffsetStart");
        }
        log.debug("Fetching object from s3://{}/{}, with range start {}", new Object[]{this.bucket, key, byteOffsetStart});
        try {
            object = this.client.getObject(request);
        }
        catch (AmazonClientException e) {
            throw new TierObjectStoreRetriableException("Failed to fetch segment " + objectMetadata, e);
        }
        catch (Exception e) {
            throw new TierObjectStoreFatalException("Unknown exception when fetching segment " + objectMetadata, e);
        }
        S3ObjectInputStream inputStream = object.getObjectContent();
        return new S3TierObjectStoreResponse(inputStream, this.autoAbortThresholdBytes, object.getObjectMetadata().getContentLength());
    }

    @Override
    public void putSegment(TierObjectStore.ObjectMetadata objectMetadata, File segmentData, File offsetIndexData, File timestampIndexData, Optional<File> producerStateSnapshotData, Optional<ByteBuffer> transactionIndexData, Optional<File> epochState) {
        Map<String, String> metadata = TierObjectStoreUtils.createSegmentMetadata(objectMetadata, this.clusterId, this.brokerId);
        try {
            if (segmentData.length() <= (long)this.partUploadSize) {
                this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.SEGMENT), metadata, segmentData);
            } else {
                this.putFileMultipart(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.SEGMENT), metadata, segmentData);
            }
            this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.OFFSET_INDEX), metadata, offsetIndexData);
            this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.TIMESTAMP_INDEX), metadata, timestampIndexData);
            producerStateSnapshotData.ifPresent(file -> this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.PRODUCER_STATE), metadata, (File)file));
            transactionIndexData.ifPresent(abortedTxnsBuf -> this.putBuf(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX), metadata, (ByteBuffer)abortedTxnsBuf));
            epochState.ifPresent(file -> this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.EPOCH_STATE), metadata, (File)file));
        }
        catch (AmazonClientException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segment " + objectMetadata, e);
        }
        catch (Exception e) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment " + objectMetadata, e);
        }
    }

    @Override
    public void deleteSegment(TierObjectStore.ObjectMetadata objectMetadata) {
        ArrayList<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<DeleteObjectsRequest.KeyVersion>();
        for (TierObjectStore.FileType type : TierObjectStore.FileType.values()) {
            keys.add(new DeleteObjectsRequest.KeyVersion(TierObjectStoreUtils.keyPath(objectMetadata, type)));
        }
        DeleteObjectsRequest request = new DeleteObjectsRequest(this.bucket).withKeys(keys);
        log.debug("Deleting " + keys);
        try {
            this.client.deleteObjects(request);
        }
        catch (AmazonClientException e) {
            throw new TierObjectStoreRetriableException("Failed to delete segment " + objectMetadata, e);
        }
        catch (Exception e) {
            throw new TierObjectStoreFatalException("Unknown exception when deleting segment " + objectMetadata, e);
        }
    }

    @Override
    public void close() {
        this.client.shutdown();
    }

    private ObjectMetadata putObjectMetadata(Map<String, String> userMetadata) {
        ObjectMetadata metadata = new ObjectMetadata();
        if (this.sseAlgorithm != null) {
            metadata.setSSEAlgorithm(this.sseAlgorithm);
        }
        if (userMetadata != null) {
            metadata.setUserMetadata(userMetadata);
        }
        return metadata;
    }

    private void putFile(String key, Map<String, String> metadata, File file) {
        PutObjectRequest request = new PutObjectRequest(this.bucket, key, file).withMetadata(this.putObjectMetadata(metadata));
        log.debug("Uploading object to s3://{}/{}", (Object)this.bucket, (Object)key);
        this.client.putObject(request);
    }

    private void putBuf(String key, Map<String, String> metadata, ByteBuffer buf) {
        ObjectMetadata s3metadata = this.putObjectMetadata(metadata);
        s3metadata.setContentLength((long)(buf.limit() - buf.position()));
        PutObjectRequest request = new PutObjectRequest(this.bucket, key, (InputStream)new ByteBufferInputStream(buf), s3metadata);
        log.debug("Uploading object to s3://{}/{}", (Object)this.bucket, (Object)key);
        this.client.putObject(request);
    }

    private void putFileMultipart(String key, Map<String, String> metadata, File file) {
        long fileLength = file.length();
        long partSize = this.partUploadSize;
        log.debug("Uploading multipart object to s3://{}/{}", (Object)this.bucket, (Object)key);
        ArrayList<PartETag> partETags = new ArrayList<PartETag>();
        InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(this.bucket, key, this.putObjectMetadata(metadata));
        InitiateMultipartUploadResult initiateMultipartUploadResult = this.client.initiateMultipartUpload(initiateMultipartUploadRequest);
        long filePosition = 0L;
        int partNum = 1;
        while (filePosition < fileLength) {
            partSize = Math.min(partSize, fileLength - filePosition);
            UploadPartRequest uploadPartRequest = new UploadPartRequest().withBucketName(this.bucket).withKey(key).withUploadId(initiateMultipartUploadResult.getUploadId()).withPartNumber(partNum).withFile(file).withFileOffset(filePosition).withPartSize(partSize);
            UploadPartResult uploadPartResult = this.client.uploadPart(uploadPartRequest);
            partETags.add(uploadPartResult.getPartETag());
            filePosition += partSize;
            ++partNum;
        }
        CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(this.bucket, key, initiateMultipartUploadResult.getUploadId(), partETags);
        this.client.completeMultipartUpload(completeMultipartUploadRequest);
    }

    private static AmazonS3 client(S3TierObjectStoreConfig config) {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setUserAgentPrefix("APN/1.0 Confluent/1.0 TieredStorageS3/1.0");
        AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
        builder.setClientConfiguration(clientConfiguration);
        if (config.s3SignerOverride.isPresent() && !config.s3SignerOverride.get().isEmpty()) {
            clientConfiguration.setSignerOverride(config.s3SignerOverride.get());
        }
        if (config.s3EndpointOverride.isPresent() && !config.s3EndpointOverride.get().isEmpty()) {
            builder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(config.s3EndpointOverride.get(), Regions.fromName((String)config.s3Region).getName()));
            builder.setPathStyleAccessEnabled(Boolean.valueOf(true));
        } else if (config.s3Region != null && !config.s3Region.isEmpty()) {
            builder.setRegion(config.s3Region);
        }
        if (config.s3AwsAccessKeyId.isPresent() && config.s3AwsSecretAccessKey.isPresent()) {
            BasicAWSCredentials credentials = new BasicAWSCredentials(config.s3AwsAccessKeyId.get(), config.s3AwsSecretAccessKey.get());
            builder.setCredentials((AWSCredentialsProvider)new AWSStaticCredentialsProvider((AWSCredentials)credentials));
        } else {
            builder.setCredentials((AWSCredentialsProvider)new DefaultAWSCredentialsProviderChain());
        }
        return (AmazonS3)builder.build();
    }

    private void expectBucket(String bucket, String expectedRegion) throws TierObjectStoreFatalException {
        try {
            String actualRegion = this.client.getBucketLocation(bucket);
            if (actualRegion.equals("US") && expectedRegion.equals("us-east-1")) {
                return;
            }
            if (!expectedRegion.equals(actualRegion)) {
                log.warn("Bucket region {} does not match expected region {}", (Object)actualRegion, (Object)expectedRegion);
            }
        }
        catch (AmazonClientException ex) {
            throw new TierObjectStoreFatalException("Failed to access bucket " + bucket, ex);
        }
    }

    private static class S3TierObjectStoreResponse
    implements TierObjectStoreResponse {
        private final AutoAbortingS3InputStream inputStream;

        S3TierObjectStoreResponse(S3ObjectInputStream inputStream, long autoAbortSize, long streamSize) {
            this.inputStream = new AutoAbortingS3InputStream(inputStream, autoAbortSize, streamSize);
        }

        @Override
        public void close() {
            this.inputStream.close();
        }

        @Override
        public InputStream getInputStream() {
            return this.inputStream;
        }
    }
}

