package kafka.tier.store;

import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.PropertiesFileCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import kafka.server.KafkaConfig;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.TierObjectStore;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/store/S3TierObjectStore.class */
public class S3TierObjectStore implements TierObjectStore {
    private static final Logger log = LoggerFactory.getLogger(S3TierObjectStore.class);
    private final Optional<String> clusterIdOpt;
    private final Optional<Integer> brokerIdOpt;
    private final AmazonS3 client;
    private final String bucket;
    private final String prefix;
    private final String sseAlgorithm;
    private final String sseCustomerEncryptionKey;
    private final int autoAbortThresholdBytes;

    /* loaded from: input_file:kafka/tier/store/S3TierObjectStore$S3TierObjectStoreResponse.class */
    private static class S3TierObjectStoreResponse implements TierObjectStoreResponse {
        private final AutoAbortingGenericInputStream inputStream;

        S3TierObjectStoreResponse(S3ObjectInputStream s3ObjectInputStream, long j, long j2) {
            this.inputStream = new AutoAbortingGenericInputStream(s3ObjectInputStream, j, j2);
        }

        @Override // kafka.tier.store.TierObjectStoreResponse, java.lang.AutoCloseable
        public void close() {
            this.inputStream.close();
        }

        @Override // kafka.tier.store.TierObjectStoreResponse
        public InputStream getInputStream() {
            return this.inputStream;
        }
    }

    public S3TierObjectStore(S3TierObjectStoreConfig s3TierObjectStoreConfig) {
        this(client(s3TierObjectStoreConfig), s3TierObjectStoreConfig);
    }

    S3TierObjectStore(AmazonS3 amazonS3, S3TierObjectStoreConfig s3TierObjectStoreConfig) {
        this.clusterIdOpt = s3TierObjectStoreConfig.clusterIdOpt;
        this.brokerIdOpt = s3TierObjectStoreConfig.brokerIdOpt;
        this.client = amazonS3;
        this.bucket = s3TierObjectStoreConfig.s3Bucket;
        this.prefix = s3TierObjectStoreConfig.s3Prefix;
        this.sseAlgorithm = s3TierObjectStoreConfig.s3SseAlgorithm;
        this.sseCustomerEncryptionKey = s3TierObjectStoreConfig.s3SseCustomerEncryptionKey;
        this.autoAbortThresholdBytes = s3TierObjectStoreConfig.s3AutoAbortThresholdBytes.intValue();
        expectBucket(this.bucket, s3TierObjectStoreConfig.s3Region);
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStore.Backend getBackend() {
        return TierObjectStore.Backend.S3;
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStoreResponse getObject(TierObjectStore.ObjectStoreMetadata objectStoreMetadata, TierObjectStore.FileType fileType, Integer num, Integer num2) {
        String keyPath = keyPath(objectStoreMetadata, fileType);
        GetObjectRequest getObjectRequest = new GetObjectRequest(this.bucket, keyPath);
        if (num != null && num2 != null) {
            getObjectRequest.setRange(num.intValue(), num2.intValue());
        } else if (num != null) {
            getObjectRequest.setRange(num.intValue());
        } else if (num2 != null) {
            throw new IllegalStateException("Cannot specify a byteOffsetEnd without specifying a byteOffsetStart");
        }
        log.debug("Fetching object from s3://{}/{}, with range start {}", new Object[]{this.bucket, keyPath, num});
        try {
            S3Object object = this.client.getObject(getObjectRequest);
            return new S3TierObjectStoreResponse(object.getObjectContent(), this.autoAbortThresholdBytes, object.getObjectMetadata().getContentLength());
        } catch (Exception e) {
            throw new TierObjectStoreFatalException(String.format("Unknown exception when fetching object, metadata: %s type: %s range %s-%s", objectStoreMetadata, fileType, num, num2), e);
        } catch (AmazonClientException e2) {
            throw new TierObjectStoreRetriableException(String.format("Failed to fetch object, metadata: %s type: %s range %s-%s", objectStoreMetadata, fileType, num, num2), e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStore.OpaqueData prepPutSegment() throws TierObjectStoreRetriableException, IOException {
        return TierObjectStore.OpaqueData.ZEROED;
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putObject(TierObjectStore.ObjectStoreMetadata objectStoreMetadata, File file, TierObjectStore.FileType fileType) {
        try {
            putFile(keyPath(objectStoreMetadata, fileType), objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt), file);
        } catch (AmazonClientException e) {
            throw new TierObjectStoreRetriableException(String.format("Failed to upload object %s, file %s, type %s", objectStoreMetadata, file, fileType), e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException(String.format("Failed to upload object %s, file %s, type %s", objectStoreMetadata, file, fileType), e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putSegment(TierObjectStore.ObjectMetadata objectMetadata, File file, File file2, File file3, Optional<File> optional, Optional<ByteBuffer> optional2, Optional<ByteBuffer> optional3) {
        Map<String, String> objectMetadata2 = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.SEGMENT), objectMetadata2, file);
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.OFFSET_INDEX), objectMetadata2, file2);
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.TIMESTAMP_INDEX), objectMetadata2, file3);
            optional.ifPresent(file4 -> {
                putFile(keyPath(objectMetadata, TierObjectStore.FileType.PRODUCER_STATE), objectMetadata2, file4);
            });
            optional2.ifPresent(byteBuffer -> {
                putBuf(keyPath(objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX), objectMetadata2, byteBuffer);
            });
            optional3.ifPresent(byteBuffer2 -> {
                putBuf(keyPath(objectMetadata, TierObjectStore.FileType.EPOCH_STATE), objectMetadata2, byteBuffer2);
            });
        } catch (AmazonClientException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segment: " + objectMetadata, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment: " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void deleteSegment(TierObjectStore.ObjectMetadata objectMetadata) {
        ArrayList arrayList = new ArrayList();
        for (TierObjectStore.FileType fileType : TierObjectStore.FileType.values()) {
            String keyPath = keyPath(objectMetadata, fileType);
            log.debug("Deleting object s3://{}/{}", this.bucket, keyPath);
            arrayList.add(new DeleteObjectsRequest.KeyVersion(keyPath));
        }
        try {
            this.client.deleteObjects(new DeleteObjectsRequest(this.bucket).withKeys(arrayList));
        } catch (Exception e) {
            throw new TierObjectStoreFatalException("Unknown exception when deleting segment: " + objectMetadata, e);
        } catch (AmazonClientException e2) {
            throw new TierObjectStoreRetriableException("Failed to delete segment: " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void close() {
        this.client.shutdown();
    }

    private String keyPath(TierObjectStore.ObjectStoreMetadata objectStoreMetadata, TierObjectStore.FileType fileType) {
        return objectStoreMetadata.toPath(this.prefix, fileType);
    }

    private ObjectMetadata putObjectMetadata(Map<String, String> map) {
        ObjectMetadata objectMetadata = new ObjectMetadata();
        if (this.sseAlgorithm != null) {
            objectMetadata.setSSEAlgorithm(this.sseAlgorithm);
        }
        if (map != null) {
            objectMetadata.setUserMetadata(map);
        }
        return objectMetadata;
    }

    private void putFile(String str, Map<String, String> map, File file) {
        PutObjectRequest withMetadata = new PutObjectRequest(this.bucket, str, file).withMetadata(putObjectMetadata(map));
        if (this.sseCustomerEncryptionKey != null && this.sseAlgorithm.equals("aws:kms")) {
            withMetadata.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(this.sseCustomerEncryptionKey));
        }
        log.debug("Uploading object to s3://{}/{}", this.bucket, str);
        this.client.putObject(withMetadata);
    }

    private void putBuf(String str, Map<String, String> map, ByteBuffer byteBuffer) {
        ObjectMetadata putObjectMetadata = putObjectMetadata(map);
        putObjectMetadata.setContentLength(byteBuffer.limit() - byteBuffer.position());
        PutObjectRequest putObjectRequest = new PutObjectRequest(this.bucket, str, new ByteBufferInputStream(byteBuffer.duplicate()), putObjectMetadata);
        if (this.sseCustomerEncryptionKey != null && !this.sseCustomerEncryptionKey.isEmpty() && this.sseAlgorithm.equals("aws:kms")) {
            putObjectRequest.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(this.sseCustomerEncryptionKey));
        }
        log.debug("Uploading object to s3://{}/{}", this.bucket, str);
        this.client.putObject(putObjectRequest);
    }

    public static String validateAndGetS3RegionName(String str) {
        try {
            return Regions.fromName(str).getName();
        } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("Configured " + KafkaConfig.TierS3RegionProp() + " '" + str + "' is not known");
        }
    }

    public static AmazonS3 client(S3TierObjectStoreConfig s3TierObjectStoreConfig) throws TierObjectStoreFatalException {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setUserAgentPrefix(s3TierObjectStoreConfig.s3UserAgentPrefix);
        SSLConnectionSocketFactory sSLConnectionSocketFactory = getSSLConnectionSocketFactory(s3TierObjectStoreConfig);
        if (sSLConnectionSocketFactory != null) {
            clientConfiguration.getApacheHttpClientConfig().setSslSocketFactory(sSLConnectionSocketFactory);
        }
        AmazonS3ClientBuilder standard = AmazonS3ClientBuilder.standard();
        standard.setClientConfiguration(clientConfiguration);
        if (s3TierObjectStoreConfig.s3ForcePathStyleAccess.booleanValue()) {
            standard.setPathStyleAccessEnabled(true);
        }
        if (s3TierObjectStoreConfig.s3SignerOverride.isPresent() && !s3TierObjectStoreConfig.s3SignerOverride.get().isEmpty()) {
            clientConfiguration.setSignerOverride(s3TierObjectStoreConfig.s3SignerOverride.get());
        }
        if (s3TierObjectStoreConfig.s3EndpointOverride.isPresent() && !s3TierObjectStoreConfig.s3EndpointOverride.get().isEmpty()) {
            standard.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(s3TierObjectStoreConfig.s3EndpointOverride.get(), validateAndGetS3RegionName(s3TierObjectStoreConfig.s3Region)));
        } else if (s3TierObjectStoreConfig.s3Region != null && !s3TierObjectStoreConfig.s3Region.isEmpty()) {
            standard.setRegion(s3TierObjectStoreConfig.s3Region);
        }
        AWSCredentialsProvider aWSCredentialsProvider = (AWSCredentialsProvider) s3TierObjectStoreConfig.s3CredFilePath.map(PropertiesFileCredentialsProvider::new).orElse(new DefaultAWSCredentialsProviderChain());
        if (s3TierObjectStoreConfig.assumeRoleArn.isPresent()) {
            aWSCredentialsProvider = new STSAssumeRoleSessionCredentialsProvider.Builder(s3TierObjectStoreConfig.assumeRoleArn.get(), "tiered-storage").withStsClient((AWSSecurityTokenService) AWSSecurityTokenServiceClient.builder().withCredentials(aWSCredentialsProvider).build()).build();
        }
        standard.setCredentials(aWSCredentialsProvider);
        return (AmazonS3) standard.build();
    }

    private void expectBucket(String str, String str2) throws TierObjectStoreFatalException {
        try {
            String bucketLocation = this.client.getBucketLocation(str);
            if (bucketLocation.equals("US") && str2.equals("us-east-1")) {
                return;
            }
            if (!str2.equals(bucketLocation)) {
                log.warn("Bucket region {} does not match expected region {}", bucketLocation, str2);
            }
        } catch (AmazonClientException e) {
            throw new TierObjectStoreFatalException("Failed to access bucket " + str, e);
        }
    }

    private static SSLConnectionSocketFactory getSSLConnectionSocketFactory(S3TierObjectStoreConfig s3TierObjectStoreConfig) throws TierObjectStoreFatalException {
        SSLConnectionSocketFactory sSLConnectionSocketFactory = null;
        boolean z = s3TierObjectStoreConfig.s3SslTrustStoreLocation.isPresent() && !s3TierObjectStoreConfig.s3SslTrustStoreLocation.get().isEmpty();
        boolean z2 = s3TierObjectStoreConfig.s3SslKeyStoreLocation.isPresent() && !s3TierObjectStoreConfig.s3SslKeyStoreLocation.get().isEmpty();
        if (z || z2) {
            try {
                SSLContextBuilder custom = SSLContexts.custom();
                if (z) {
                    KeyStore keyStore = KeyStore.getInstance(s3TierObjectStoreConfig.s3SslTrustStoreType.get());
                    keyStore.load(new FileInputStream(s3TierObjectStoreConfig.s3SslTrustStoreLocation.get()), s3TierObjectStoreConfig.s3SslTrustStorePassword.get().value().toCharArray());
                    custom.loadTrustMaterial(keyStore, (TrustStrategy) null);
                }
                if (z2) {
                    KeyStore keyStore2 = KeyStore.getInstance(s3TierObjectStoreConfig.s3SslKeyStoreType.get());
                    keyStore2.load(new FileInputStream(s3TierObjectStoreConfig.s3SslKeyStoreLocation.get()), s3TierObjectStoreConfig.s3SslKeyStorePassword.get().value().toCharArray());
                    custom.loadKeyMaterial(keyStore2, s3TierObjectStoreConfig.s3SslKeyPassword.get().value().toCharArray(), (map, socket) -> {
                        return "confluent.kafka";
                    });
                }
                custom.setProtocol(s3TierObjectStoreConfig.s3SslProtocol);
                sSLConnectionSocketFactory = new SSLConnectionSocketFactory(custom.build(), (String[]) s3TierObjectStoreConfig.s3SslEnabledProtocols.toArray(new String[0]), (String[]) null, new DefaultHostnameVerifier());
            } catch (Exception e) {
                throw new TierObjectStoreFatalException("Failed to load keystore or trust store for tiered object store", e);
            }
        }
        return sSLConnectionSocketFactory;
    }
}
