package kafka.tier.store;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.PropertiesFileCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ListVersionsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.VersionListing;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import io.confluent.kafka.storage.checksum.E2EChecksumProtectedObjectType;
import io.confluent.kafka.storage.checksum.E2EChecksumStore;
import io.confluent.kafka.storage.checksum.E2EChecksumUtils;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import kafka.server.KafkaConfig;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.exceptions.E2EChecksumInvalidException;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.ObjectType;
import kafka.tier.store.objects.ThrottledSegmentUpload;
import kafka.tier.store.objects.TierSegmentUpload;
import kafka.tier.store.objects.metadata.HealthMetadata;
import kafka.tier.store.objects.metadata.ObjectMetadata;
import kafka.tier.store.objects.metadata.ObjectStoreMetadata;
import kafka.tier.store.objects.metadata.TierRecoveryUploadMetadata;
import kafka.tier.tools.RecoveryUtils;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.config.SegmentMetadataLayoutPutMode;
import org.apache.kafka.storage.internals.utils.Throttler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.exception.RetryableException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

/* loaded from: input_file:kafka/tier/store/S3TierObjectStore.class */
public class S3TierObjectStore extends TierObjectStore {
    private static final Logger log = LoggerFactory.getLogger(S3TierObjectStore.class);
    private final Optional<String> clusterIdOpt;
    private final Optional<Integer> brokerIdOpt;
    private final AmazonS3 client;
    private final Optional<S3AsyncClient> asyncClientOpt;
    private final Optional<AWSCredentialsProvider> credentialsProvider;
    private final String bucket;
    private final String prefix;
    private final String sseAlgorithm;
    private final String sseCustomerEncryptionKey;
    private final int autoAbortThresholdBytes;
    private final boolean v2Enabled;
    private AtomicInteger credentialsRefreshRetries;
    private Optional<ExecutorService> executorOpt;
    private final Optional<E2EChecksumStore> checksumStoreOpt;
    private static final int DEFAULT_S3_MAX_CREDENTIAL_REFRESH_RETRIES = 5;
    private static final int DEFAULT_S3_DELETE_BATCH_SIZE = 1000;
    private static final int DEFAULT_S3_MAX_LIST_KEYS = 1000;
    private static final String CRC32C = "CRC32C";
    private static final String CRC32C_HEADER = "x-amz-checksum-crc32c";
    private static final String ERROR_CODE_BAD_DIGEST = "BadDigest";
    static final String ERROR_CODE_EXPIRED_TOKEN = "ExpiredToken";
    private static final String ERROR_CODE_NO_SUCH_KEY = "NoSuchKey";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: kafka.tier.store.S3TierObjectStore$1, reason: invalid class name */
    /* loaded from: input_file:kafka/tier/store/S3TierObjectStore$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$server$config$SegmentMetadataLayoutPutMode;

        static {
            try {
                $SwitchMap$kafka$tier$store$objects$ObjectType[ObjectType.TRANSACTION_INDEX.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$kafka$tier$store$objects$ObjectType[ObjectType.EPOCH_STATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$kafka$tier$store$objects$ObjectType[ObjectType.PRODUCER_STATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$kafka$server$config$SegmentMetadataLayoutPutMode = new int[SegmentMetadataLayoutPutMode.values().length];
            try {
                $SwitchMap$org$apache$kafka$server$config$SegmentMetadataLayoutPutMode[SegmentMetadataLayoutPutMode.LegacyMultiObject.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$config$SegmentMetadataLayoutPutMode[SegmentMetadataLayoutPutMode.CombinedObject.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:kafka/tier/store/S3TierObjectStore$AmazonS3ClientAndCredentialsProvider.class */
    public static class AmazonS3ClientAndCredentialsProvider {
        private final AmazonS3 client;
        private final Optional<S3AsyncClient> asyncClientOpt;
        private final AWSCredentialsProvider credentialsProvider;

        public AmazonS3ClientAndCredentialsProvider(AmazonS3 amazonS3, Optional<S3AsyncClient> optional, AWSCredentialsProvider aWSCredentialsProvider) {
            this.client = amazonS3;
            this.asyncClientOpt = optional;
            this.credentialsProvider = aWSCredentialsProvider;
        }

        public AmazonS3 client() {
            return this.client;
        }

        public Optional<S3AsyncClient> asyncClientOpt() {
            return this.asyncClientOpt;
        }

        public AWSCredentialsProvider credentialsProvider() {
            return this.credentialsProvider;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/tier/store/S3TierObjectStore$S3TierObjectStoreResponse.class */
    public static class S3TierObjectStoreResponse implements TierObjectStoreResponse {
        private final AutoAbortingGenericInputStream inputStream;

        S3TierObjectStoreResponse(InputStream inputStream, long j, long j2) {
            this.inputStream = new S3AutoAbortingInputStream(inputStream, j, j2);
        }

        @Override // kafka.tier.store.TierObjectStoreResponse, java.lang.AutoCloseable
        public void close() {
            this.inputStream.close();
        }

        @Override // kafka.tier.store.TierObjectStoreResponse
        public InputStream getInputStream() {
            return this.inputStream;
        }
    }

    public S3TierObjectStore(S3TierObjectStoreConfig s3TierObjectStoreConfig, Optional<E2EChecksumStore> optional) {
        this(client(s3TierObjectStoreConfig), s3TierObjectStoreConfig, optional);
    }

    S3TierObjectStore(AmazonS3ClientAndCredentialsProvider amazonS3ClientAndCredentialsProvider, S3TierObjectStoreConfig s3TierObjectStoreConfig, Optional<E2EChecksumStore> optional) {
        this(amazonS3ClientAndCredentialsProvider.client(), amazonS3ClientAndCredentialsProvider.asyncClientOpt(), amazonS3ClientAndCredentialsProvider.credentialsProvider(), s3TierObjectStoreConfig, optional);
    }

    S3TierObjectStore(AmazonS3 amazonS3, AWSCredentialsProvider aWSCredentialsProvider, S3TierObjectStoreConfig s3TierObjectStoreConfig, Optional<E2EChecksumStore> optional) {
        this(amazonS3, Optional.empty(), aWSCredentialsProvider, s3TierObjectStoreConfig, optional);
    }

    S3TierObjectStore(AmazonS3 amazonS3, Optional<S3AsyncClient> optional, AWSCredentialsProvider aWSCredentialsProvider, S3TierObjectStoreConfig s3TierObjectStoreConfig, Optional<E2EChecksumStore> optional2) {
        this.credentialsRefreshRetries = new AtomicInteger();
        this.executorOpt = Optional.empty();
        this.clusterIdOpt = s3TierObjectStoreConfig.clusterIdOpt;
        this.brokerIdOpt = s3TierObjectStoreConfig.brokerIdOpt;
        this.client = amazonS3;
        this.asyncClientOpt = optional;
        this.credentialsProvider = Optional.ofNullable(aWSCredentialsProvider);
        this.bucket = s3TierObjectStoreConfig.s3Bucket;
        this.prefix = s3TierObjectStoreConfig.s3Prefix;
        this.sseAlgorithm = s3TierObjectStoreConfig.s3SseAlgorithm;
        this.sseCustomerEncryptionKey = s3TierObjectStoreConfig.s3SseCustomerEncryptionKey;
        this.autoAbortThresholdBytes = s3TierObjectStoreConfig.s3AutoAbortThresholdBytes.intValue();
        expectBucket(this.bucket, s3TierObjectStoreConfig.s3Region, s3TierObjectStoreConfig.s3EndpointOverride);
        this.checksumStoreOpt = optional2;
        this.v2Enabled = s3TierObjectStoreConfig.s3V2Enabled.booleanValue();
    }

    @Override // kafka.tier.store.TierObjectStore
    public String keyPrefix() {
        return this.prefix;
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStore.Backend getBackend() {
        return TierObjectStore.Backend.S3;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    private void handleV2Exception(ExecutionException executionException) {
        ?? cause = executionException.getCause();
        if (cause instanceof TierObjectStoreRetriableException) {
            throw ((TierObjectStoreRetriableException) cause);
        }
        if (cause instanceof TierObjectStoreFatalException) {
            throw ((TierObjectStoreFatalException) cause);
        }
        if (cause instanceof E2EChecksumInvalidException) {
            throw ((E2EChecksumInvalidException) cause);
        }
        throw new TierObjectStoreFatalException("Unknown exception while getting objects", cause != 0 ? cause : executionException);
    }

    private Set<String> listObjectsWithoutVersions(String str) {
        HashSet hashSet = new HashSet();
        try {
            checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                ListObjectsV2Result listObjectsV2;
                ListObjectsV2Request withMaxKeys = new ListObjectsV2Request().withBucketName(this.bucket).withPrefix(str).withMaxKeys(Integer.valueOf(RecoveryUtils.FENCE_EVENT_BATCH_SIZE));
                do {
                    listObjectsV2 = this.client.listObjectsV2(withMaxKeys);
                    Iterator it = listObjectsV2.getObjectSummaries().iterator();
                    while (it.hasNext()) {
                        hashSet.add(((S3ObjectSummary) it.next()).getKey());
                    }
                    withMaxKeys.setContinuationToken(listObjectsV2.getNextContinuationToken());
                } while (listObjectsV2.isTruncated());
                log.debug("TierObjectStore listObjects versions: false keyPrefix:  " + str + " # of objects returned: " + hashSet.size());
                return null;
            });
            return hashSet;
        } catch (SdkClientException e) {
            throw new TierObjectStoreRetriableException("Failed to list the objects (without versions) with prefix: " + str, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception while listing objects (without versions) with prefix: " + str, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public Map<String, List<VersionInformation>> listObject(String str, boolean z) {
        if (this.v2Enabled) {
            try {
                return listObjectAsync(str, z).get();
            } catch (InterruptedException | CancellationException e) {
                throw new TierObjectStoreFatalException("Unknown exception while listing objects with prefix: " + str, e.getCause() != null ? e.getCause() : e);
            } catch (ExecutionException e2) {
                handleV2Exception(e2);
            }
        }
        HashMap hashMap = new HashMap();
        if (!z) {
            Iterator<String> it = listObjectsWithoutVersions(str).iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), new ArrayList());
            }
            return hashMap;
        }
        try {
            checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                VersionListing listVersions = this.client.listVersions(new ListVersionsRequest().withBucketName(this.bucket).withPrefix(str).withMaxResults(Integer.valueOf(RecoveryUtils.FENCE_EVENT_BATCH_SIZE)));
                while (true) {
                    VersionListing versionListing = listVersions;
                    for (S3VersionSummary s3VersionSummary : versionListing.getVersionSummaries()) {
                        hashMap.putIfAbsent(s3VersionSummary.getKey(), new ArrayList());
                        ((List) hashMap.get(s3VersionSummary.getKey())).add(new S3VersionInformation(s3VersionSummary.getVersionId(), s3VersionSummary.isDeleteMarker(), s3VersionSummary.isLatest()));
                    }
                    if (!versionListing.isTruncated()) {
                        break;
                    }
                    listVersions = this.client.listNextBatchOfVersions(versionListing);
                }
                if (!log.isDebugEnabled()) {
                    return null;
                }
                StringBuilder sb = new StringBuilder();
                hashMap.forEach((str2, list) -> {
                    sb.append("[").append(str2).append("->").append(Arrays.toString(list.toArray())).append("] ");
                });
                log.debug("TierObjectStore listObjects versions: true keyPrefix: " + str + " " + ((Object) sb));
                return null;
            });
            return hashMap;
        } catch (SdkClientException e3) {
            throw new TierObjectStoreRetriableException("Failed to list the objects with prefix: " + str, e3);
        } catch (Exception e4) {
            throw new TierObjectStoreFatalException("Unknown exception while listing objects with prefix: " + str, e4);
        }
    }

    private CompletableFuture<Map<String, List<VersionInformation>>> listObjectsWithoutVersionsAsync(String str) {
        CompletableFuture<Map<String, List<VersionInformation>>> completableFuture = new CompletableFuture<>();
        try {
            HashMap hashMap = new HashMap();
            this.asyncClientOpt.get().listObjectsV2Paginator((software.amazon.awssdk.services.s3.model.ListObjectsV2Request) software.amazon.awssdk.services.s3.model.ListObjectsV2Request.builder().bucket(this.bucket).prefix(str).maxKeys(Integer.valueOf(RecoveryUtils.FENCE_EVENT_BATCH_SIZE)).build()).subscribe(listObjectsV2Response -> {
                listObjectsV2Response.contents().forEach(s3Object -> {
                });
            }).whenComplete((r14, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(convertOperationException(th.getCause(), TierObjectStoreAction.LIST_LIVE_OBJECTS.action(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(str), Optional.empty()));
                } else {
                    completableFuture.complete(hashMap);
                }
            });
        } catch (Exception e) {
            log.error("Failed to send async list request without version, keyPrefix: {}", str, e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // kafka.tier.store.TierObjectStore
    public CompletableFuture<Map<String, List<VersionInformation>>> listObjectAsync(String str, boolean z) {
        checkAsyncClientPresent("listObject");
        if (!z) {
            return listObjectsWithoutVersionsAsync(str);
        }
        CompletableFuture<Map<String, List<VersionInformation>>> completableFuture = new CompletableFuture<>();
        try {
            HashMap hashMap = new HashMap();
            this.asyncClientOpt.get().listObjectVersionsPaginator((ListObjectVersionsRequest) ListObjectVersionsRequest.builder().bucket(this.bucket).prefix(str).maxKeys(Integer.valueOf(RecoveryUtils.FENCE_EVENT_BATCH_SIZE)).build()).subscribe(listObjectVersionsResponse -> {
                listObjectVersionsResponse.versions().forEach(objectVersion -> {
                    hashMap.putIfAbsent(objectVersion.key(), new ArrayList());
                    ((List) hashMap.get(objectVersion.key())).add(new S3VersionInformation(objectVersion.versionId(), false, objectVersion.isLatest().booleanValue()));
                });
                listObjectVersionsResponse.deleteMarkers().forEach(deleteMarkerEntry -> {
                    hashMap.putIfAbsent(deleteMarkerEntry.key(), new ArrayList());
                    ((List) hashMap.get(deleteMarkerEntry.key())).add(new S3VersionInformation(deleteMarkerEntry.versionId(), true, deleteMarkerEntry.isLatest().booleanValue()));
                });
            }).whenComplete((r14, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(convertOperationException(th.getCause(), TierObjectStoreAction.LIST_OBJECT.action(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(str), Optional.empty()));
                    return;
                }
                if (log.isDebugEnabled()) {
                    StringBuilder sb = new StringBuilder();
                    hashMap.forEach((str2, list) -> {
                        sb.append("[").append(str2).append("->").append(Arrays.toString(list.toArray())).append("] ");
                    });
                    log.debug("TierObjectStore listObjects versions: true keyPrefix: " + str + " " + ((Object) sb));
                }
                completableFuture.complete(hashMap);
            });
        } catch (Exception e) {
            log.error("Failed to send async list request, keyPrefix: {}, getVersionInfo: {}", new Object[]{str, Boolean.valueOf(z), e});
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStoreResponse getObject(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, Long l, Long l2, VersionInformation versionInformation) {
        if (this.v2Enabled) {
            try {
                return getObjectAsync(objectStoreMetadata, objectType, l, l2, versionInformation).get();
            } catch (InterruptedException | CancellationException e) {
                throw new TierObjectStoreFatalException("Unknown exception while getting objects", e.getCause() != null ? e.getCause() : e);
            } catch (ExecutionException e2) {
                handleV2Exception(e2);
            }
        }
        String keyPath = keyPath(objectStoreMetadata, objectType);
        String fullKeyPath = fullKeyPath(keyPath);
        GetObjectRequest getObjectRequest = versionInformation != null ? new GetObjectRequest(this.bucket, keyPath, versionInformation.getVersionId()) : new GetObjectRequest(this.bucket, keyPath);
        if (l != null && l2 != null) {
            getObjectRequest.setRange(l.longValue(), l2.longValue() - 1);
        } else if (l != null && l.longValue() != 0) {
            getObjectRequest.setRange(l.longValue());
        } else if (l2 != null) {
            throw new IllegalStateException(String.format("Cannot specify a byteOffsetEndExclusive=%d without specifying a byteOffsetStart", l2));
        }
        log.debug("Fetching object from {}, with range {} - {}", new Object[]{fullKeyPath, l, l2});
        try {
            S3Object s3Object = (S3Object) checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                return this.client.getObject(getObjectRequest);
            });
            return new S3TierObjectStoreResponse(s3Object.getObjectContent(), this.autoAbortThresholdBytes, s3Object.getObjectMetadata().getContentLength());
        } catch (Exception e3) {
            throw convertFetchException(e3, fullKeyPath, objectStoreMetadata, objectType, l, l2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    protected CompletableFuture<TierObjectStoreResponse> getObjectAsync(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, Long l, Long l2, VersionInformation versionInformation) {
        CompletableFuture<TierObjectStoreResponse> completableFuture = new CompletableFuture<>();
        try {
            if (this.asyncClientOpt.isPresent()) {
                String keyPath = keyPath(objectStoreMetadata, objectType);
                String fullKeyPath = fullKeyPath(keyPath);
                GetObjectRequest.Builder key = software.amazon.awssdk.services.s3.model.GetObjectRequest.builder().bucket(this.bucket).key(keyPath);
                if (versionInformation != null) {
                    key.versionId(versionInformation.getVersionId());
                }
                if (l != null && l2 != null) {
                    key.range(String.format("bytes=%d-%d", l, Long.valueOf(l2.longValue() - 1)));
                } else if (l != null && l.longValue() != 0) {
                    key.range(String.format("bytes=%d-", l));
                } else if (l2 != null) {
                    throw new IllegalStateException("Cannot specify a byteOffsetEndExclusive without specifying a byteOffsetStart");
                }
                log.debug("Fetching object from {}, with range {} - {}", new Object[]{fullKeyPath, l, l2});
                this.asyncClientOpt.get().getObject((software.amazon.awssdk.services.s3.model.GetObjectRequest) key.build(), AsyncResponseTransformer.toBlockingInputStream()).handle((responseInputStream, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(convertFetchException(th.getCause() != null ? th.getCause() : th, fullKeyPath, objectStoreMetadata, objectType, l, l2));
                        return null;
                    }
                    Long contentLength = ((GetObjectResponse) responseInputStream.response()).contentLength();
                    completableFuture.complete(new S3TierObjectStoreResponse(responseInputStream, this.autoAbortThresholdBytes, contentLength != null ? contentLength.longValue() : TierObjectMetadata.DEFAULT_STATE_CHANGE_TIMESTAMP));
                    return null;
                });
            } else {
                log.warn("Fallback to getObject because async S3 client isn't created, metadata: {}, type: {}", objectStoreMetadata, objectType);
                completableFuture.complete(getObject(objectStoreMetadata, objectType, l, l2, versionInformation));
            }
        } catch (Exception e) {
            log.error("Failed to send async fetch request, metadata: {}, type: {}", new Object[]{objectStoreMetadata, objectType, e});
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private RuntimeException convertFetchException(Throwable th, String str, ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, Long l, Long l2) {
        return ((th instanceof AmazonClientException) || (th instanceof RetryableException)) ? new TierObjectStoreRetriableException(String.format("Failed to fetch object from %s, metadata: %s type: %s range %s-%s", str, objectStoreMetadata, objectType, l, l2), th) : new TierObjectStoreFatalException(String.format("Unknown exception when fetching object from %s, metadata: %s type: %s range %s-%s", str, objectStoreMetadata, objectType, l, l2), th);
    }

    @Override // kafka.tier.store.TierObjectStore
    public ByteBuffer getSnapshot(ObjectStoreMetadata objectStoreMetadata, FragmentType fragmentType, int i) {
        Throwable th;
        if (this.v2Enabled) {
            try {
                return getSnapshotAsync(objectStoreMetadata, fragmentType, i).get();
            } catch (InterruptedException | CancellationException th2) {
                if (th2.getCause() != null) {
                    th = th2.getCause();
                }
                throw new TierObjectStoreFatalException("Unknown exception while getting snapshots", th);
            } catch (ExecutionException e) {
                handleV2Exception(e);
            }
        }
        if (FragmentType.checkGetSnapshotSupported(fragmentType)) {
            throw new IllegalArgumentException("getSnapshot does not support the given fragmentType: " + fragmentType);
        }
        try {
            TierObjectStoreResponse objectStoreFragment = getObjectStoreFragment(objectStoreMetadata, fragmentType);
            Throwable th3 = null;
            try {
                try {
                    ByteBuffer wrap = ByteBuffer.wrap(Utils.readFullyToArray(objectStoreFragment.getInputStream(), i));
                    if (objectStoreFragment != null) {
                        if (0 != 0) {
                            try {
                                objectStoreFragment.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            objectStoreFragment.close();
                        }
                    }
                    return wrap;
                } finally {
                }
            } finally {
            }
        } catch (Exception e2) {
            AmazonS3Exception cause = e2.getCause();
            while (true) {
                AmazonS3Exception amazonS3Exception = cause;
                if (amazonS3Exception == null || amazonS3Exception == amazonS3Exception.getCause()) {
                    break;
                }
                if ((amazonS3Exception instanceof AmazonS3Exception) && ERROR_CODE_NO_SUCH_KEY.equals(amazonS3Exception.getErrorCode())) {
                    throw new TierObjectStoreFatalException("Snapshot object not found in object store.", amazonS3Exception);
                }
                cause = amazonS3Exception.getCause();
            }
            throw new TierObjectStoreRetriableException("Encountered an exception when fetching snapshot from object store.", e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public CompletableFuture<ByteBuffer> getSnapshotAsync(ObjectStoreMetadata objectStoreMetadata, FragmentType fragmentType, int i) {
        checkAsyncClientPresent("getSnapshot");
        if (!FragmentType.checkGetSnapshotSupported(fragmentType)) {
            return getObjectStoreFragmentAsync(objectStoreMetadata, fragmentType).thenApply(tierObjectStoreResponse -> {
                try {
                    return ByteBuffer.wrap(Utils.readFullyToArray(tierObjectStoreResponse.getInputStream(), i));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                S3Exception cause = th.getCause();
                while (true) {
                    S3Exception s3Exception = cause;
                    if (s3Exception == null || s3Exception == s3Exception.getCause()) {
                        break;
                    }
                    if ((s3Exception instanceof S3Exception) && ERROR_CODE_NO_SUCH_KEY.equals(s3Exception.awsErrorDetails().errorCode())) {
                        throw new TierObjectStoreFatalException("Snapshot object not found in object store.", s3Exception);
                    }
                    cause = s3Exception.getCause();
                }
                throw new TierObjectStoreRetriableException("Encountered an exception when fetching snapshot from object store.", th);
            });
        }
        CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new IllegalArgumentException("getSnapshotAsync does not support the given fragmentType: " + fragmentType));
        return completableFuture;
    }

    @Override // kafka.tier.store.TierObjectStore
    public OpaqueData prepPutSegment() throws TierObjectStoreRetriableException, IOException {
        return OpaqueData.ZEROED;
    }

    @Override // kafka.tier.store.TierObjectStore
    public String putObject(ObjectStoreMetadata objectStoreMetadata, File file, ObjectType objectType) {
        if (this.v2Enabled) {
            try {
                return putObjectAsync(objectStoreMetadata, file, objectType).get();
            } catch (InterruptedException | CancellationException e) {
                throw new TierObjectStoreFatalException("Unknown exception while putting objects", e.getCause() != null ? e.getCause() : e);
            } catch (ExecutionException e2) {
                handleV2Exception(e2);
            }
        }
        Map<String, String> objectMetadata = objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        String keyPath = keyPath(objectStoreMetadata, objectType);
        String fullKeyPath = fullKeyPath(keyPath);
        try {
            checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                putFile(keyPath, objectMetadata, file, objectType.toE2EChecksumProtectedObjectType());
                return null;
            });
            return keyPath;
        } catch (Exception e3) {
            throw new TierObjectStoreFatalException(String.format("Failed to upload object to %s with metadata %s, file %s, type %s", fullKeyPath, objectStoreMetadata, file, objectType), e3);
        } catch (AmazonClientException e4) {
            if (e4 instanceof AmazonS3Exception) {
                AmazonS3Exception amazonS3Exception = e4;
                if (ERROR_CODE_BAD_DIGEST.equals(amazonS3Exception.getErrorCode())) {
                    throw new E2EChecksumInvalidException("Checksum mismatch during object store upload", (Throwable) amazonS3Exception);
                }
            }
            throw new TierObjectStoreRetriableException(String.format("Failed to upload object to %s with metadata %s, file %s, type %s", fullKeyPath, objectStoreMetadata, file, objectType), e4);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public CompletableFuture<String> putObjectAsync(ObjectStoreMetadata objectStoreMetadata, File file, ObjectType objectType) {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        try {
            checkAsyncClientPresent("putObject");
            Map<String, String> objectMetadata = objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
            String keyPath = keyPath(objectStoreMetadata, objectType);
            String fullKeyPath = fullKeyPath(keyPath);
            putFileAsync(keyPath, objectMetadata, file, objectType.toE2EChecksumProtectedObjectType()).thenApply(putObjectResponse -> {
                return Boolean.valueOf(completableFuture.complete(keyPath));
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                completableFuture.completeExceptionally(convertOperationException(th, TierObjectStoreAction.PUT_OBJECT.action(), Optional.of(objectStoreMetadata), Optional.of(file), Optional.of(objectType), Optional.of(fullKeyPath), Optional.empty()));
                return null;
            });
        } catch (Exception e) {
            log.error("Failed to upload object to file {}, type {}, metadata {}", new Object[]{file, objectType, objectStoreMetadata, e});
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private RuntimeException convertOperationException(Throwable th, String str, Optional<ObjectStoreMetadata> optional, Optional<File> optional2, Optional<ObjectType> optional3, Optional<String> optional4, Optional<ByteBuffer> optional5) {
        RetriableException retriableException = null;
        if (th instanceof S3Exception) {
            S3Exception s3Exception = (S3Exception) th;
            if (!ERROR_CODE_BAD_DIGEST.equals(s3Exception.awsErrorDetails().errorCode())) {
                retriableException = new TierObjectStoreFatalException(String.format("Fatal: Unknown service exception when %s with metadata: %s, file: %s, type: %s, filePath: %s, buffer: %s", str, optional.orElse(null), optional2.orElse(null), optional3.orElse(null), optional4.orElse(null), optional5.orElse(null)), s3Exception);
            } else if (str.equals(TierObjectStoreAction.PUT_SEGMENT.action())) {
                try {
                    handleS3ExceptionDuringSegmentUpload(s3Exception, optional.get(), optional3.get(), optional2.get());
                } catch (E2EChecksumInvalidException | TierObjectStoreRetriableException e) {
                    retriableException = e;
                }
            } else {
                retriableException = new E2EChecksumInvalidException(String.format("Checksum mismatch when %s", str), (Throwable) s3Exception);
            }
        } else {
            retriableException = th instanceof RetryableException ? new TierObjectStoreRetriableException(String.format("Retryable: Failed %s with metadata: %s, file: %s, type: %s, filePath: %s, buffer: %s", str, optional.orElse(null), optional2.orElse(null), optional3.orElse(null), optional4.orElse(null), optional5.orElse(null)), th) : new TierObjectStoreFatalException(String.format("Fatal: Failed %s with metadata: %s, file: %s, type: %s, filePath: %s, buffer: %s", str, optional.orElse(null), optional2.orElse(null), optional3.orElse(null), optional4.orElse(null), optional5.orElse(null)), th);
        }
        return retriableException;
    }

    @Override // kafka.tier.store.TierObjectStore
    public String putBuffer(ObjectStoreMetadata objectStoreMetadata, ByteBuffer byteBuffer, ObjectType objectType) {
        if (this.v2Enabled) {
            try {
                return putBufferAsync(objectStoreMetadata, byteBuffer, objectType).get();
            } catch (InterruptedException | CancellationException e) {
                throw new TierObjectStoreFatalException("Unknown exception while putting buffer", e.getCause() != null ? e.getCause() : e);
            } catch (ExecutionException e2) {
                handleV2Exception(e2);
            }
        }
        Map<String, String> objectMetadata = objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        String keyPath = keyPath(objectStoreMetadata, objectType);
        String fullKeyPath = fullKeyPath(keyPath);
        try {
            checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                putBuf(keyPath, objectMetadata, byteBuffer);
                return null;
            });
            return keyPath;
        } catch (Exception e3) {
            throw new TierObjectStoreFatalException(String.format("Failed to upload object to %s with metadata %s, buffer %s, type %s", fullKeyPath, objectStoreMetadata, byteBuffer, objectType), e3);
        } catch (AmazonClientException e4) {
            throw new TierObjectStoreRetriableException(String.format("Failed to upload object to %s with metadata %s, buffer %s, type %s", fullKeyPath, objectStoreMetadata, byteBuffer, objectType), e4);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public CompletableFuture<String> putBufferAsync(ObjectStoreMetadata objectStoreMetadata, ByteBuffer byteBuffer, ObjectType objectType) {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        try {
            checkAsyncClientPresent("putBuffer");
            String keyPath = keyPath(objectStoreMetadata, objectType);
            Map<String, String> objectMetadata = objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
            String fullKeyPath = fullKeyPath(keyPath);
            putBufAsync(keyPath, objectMetadata, byteBuffer).thenApply(putObjectResponse -> {
                return Boolean.valueOf(completableFuture.complete(keyPath));
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                completableFuture.completeExceptionally(convertOperationException(th, TierObjectStoreAction.PUT_BUFFER.action(), Optional.of(objectStoreMetadata), Optional.empty(), Optional.of(objectType), Optional.of(fullKeyPath), Optional.of(byteBuffer)));
                return null;
            });
        } catch (Exception e) {
            log.error("Failed to upload object with metadata {}, buffer {}, type {}", new Object[]{objectStoreMetadata, byteBuffer, objectType});
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // kafka.tier.store.TierObjectStore
    public void restoreObjectByCopy(ObjectMetadata objectMetadata, String str, VersionInformation versionInformation) {
        if (this.v2Enabled) {
            try {
                restoreObjectByCopyAsync(objectMetadata, str, versionInformation).get();
                return;
            } catch (InterruptedException | CancellationException e) {
                throw new TierObjectStoreFatalException("Unknown exception while restoring object by copying buffer", e.getCause() != null ? e.getCause() : e);
            } catch (ExecutionException e2) {
                handleV2Exception(e2);
            }
        }
        String versionId = versionInformation.getVersionId();
        String fullKeyPath = fullKeyPath(str);
        try {
            checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                CopyObjectRequest withNewObjectMetadata = new CopyObjectRequest(this.bucket, str, versionId, this.bucket, str).withNewObjectMetadata(objectMetadata(null));
                setKmsParams(withNewObjectMetadata);
                log.debug("restore object for {}-{}", fullKeyPath, versionId);
                this.client.copyObject(withNewObjectMetadata);
                return null;
            });
        } catch (AmazonClientException e3) {
            throw new TierObjectStoreRetriableException(String.format("Failed to restore object %s (version: %s)", fullKeyPath, versionId), e3);
        } catch (Exception e4) {
            throw new TierObjectStoreFatalException(String.format("Unknown exception when restoring object %s (version: %s)", fullKeyPath, versionId), e4);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public CompletableFuture<Void> restoreObjectByCopyAsync(ObjectMetadata objectMetadata, String str, VersionInformation versionInformation) {
        checkAsyncClientPresent("restoreObjectByCopy");
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            String versionId = versionInformation.getVersionId();
            String fullKeyPath = fullKeyPath(str);
            CopyObjectRequest.Builder metadata = software.amazon.awssdk.services.s3.model.CopyObjectRequest.builder().sourceBucket(this.bucket).sourceKey(str).sourceVersionId(versionId).destinationBucket(this.bucket).destinationKey(str).metadata(objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt));
            if (usesKms()) {
                metadata.serverSideEncryption(ServerSideEncryption.AWS_KMS).ssekmsKeyId(this.sseCustomerEncryptionKey);
            }
            log.debug("restore object for {}-{}", fullKeyPath, versionId);
            this.asyncClientOpt.get().copyObject((software.amazon.awssdk.services.s3.model.CopyObjectRequest) metadata.build()).handle((copyObjectResponse, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(convertOperationException(th.getCause(), String.format("%s %s", TierObjectStoreAction.RESTORE_OBJECT.action(), versionId), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(fullKeyPath), Optional.empty()));
                    return null;
                }
                completableFuture.complete(null);
                return null;
            });
        } catch (Exception e) {
            log.error("Failed to send async restore request, metadata: {}, key: {}", new Object[]{objectMetadata, str, e});
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private void setKmsParams(com.amazonaws.services.s3.model.CopyObjectRequest copyObjectRequest) {
        if (usesKms()) {
            copyObjectRequest.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(this.sseCustomerEncryptionKey));
        }
    }

    private void putSegmentAsMultiObject(TierSegmentUpload<?> tierSegmentUpload, AtomicReference<ObjectType> atomicReference) throws IOException {
        Map<String, String> objectMetadata = tierSegmentUpload.objectMetadata().objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        atomicReference.set(ObjectType.SEGMENT);
        if (tierSegmentUpload.throttlerOpt().isPresent() && (tierSegmentUpload instanceof ThrottledSegmentUpload)) {
            ThrottledSegmentUpload throttledSegmentUpload = (ThrottledSegmentUpload) tierSegmentUpload;
            Throttler throttler = throttledSegmentUpload.throttlerOpt().get();
            putFileWithThrottling(keyPath(throttledSegmentUpload.objectMetadata(), ObjectType.SEGMENT), objectMetadata, throttledSegmentUpload.segment(), E2EChecksumProtectedObjectType.SEGMENT, throttler);
            atomicReference.set(ObjectType.OFFSET_INDEX);
            putFileWithThrottling(keyPath(throttledSegmentUpload.objectMetadata(), ObjectType.OFFSET_INDEX), objectMetadata, throttledSegmentUpload.offsetIdx(), E2EChecksumProtectedObjectType.OFFSET_INDEX, throttler);
            atomicReference.set(ObjectType.TIMESTAMP_INDEX);
            putFileWithThrottling(keyPath(throttledSegmentUpload.objectMetadata(), ObjectType.TIMESTAMP_INDEX), objectMetadata, throttledSegmentUpload.timestampIdx(), E2EChecksumProtectedObjectType.TIMESTAMP_INDEX, throttler);
            if (throttledSegmentUpload.producerStateSnapshotOpt().isPresent()) {
                atomicReference.set(ObjectType.PRODUCER_STATE);
                putFileWithThrottling(keyPath(throttledSegmentUpload.objectMetadata(), ObjectType.PRODUCER_STATE), objectMetadata, throttledSegmentUpload.producerStateSnapshotOpt().get(), E2EChecksumProtectedObjectType.PRODUCER_STATE, throttler);
            }
        } else {
            putFile(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.SEGMENT), objectMetadata, tierSegmentUpload.segment(), E2EChecksumProtectedObjectType.SEGMENT);
            atomicReference.set(ObjectType.OFFSET_INDEX);
            putFile(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.OFFSET_INDEX), objectMetadata, tierSegmentUpload.offsetIdx(), E2EChecksumProtectedObjectType.OFFSET_INDEX);
            atomicReference.set(ObjectType.TIMESTAMP_INDEX);
            putFile(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.TIMESTAMP_INDEX), objectMetadata, tierSegmentUpload.timestampIdx(), E2EChecksumProtectedObjectType.TIMESTAMP_INDEX);
            if (tierSegmentUpload.producerStateSnapshotOpt().isPresent()) {
                atomicReference.set(ObjectType.PRODUCER_STATE);
                Object obj = tierSegmentUpload.producerStateSnapshotOpt().get();
                if (obj instanceof File) {
                    putFile(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.PRODUCER_STATE), objectMetadata, (File) obj, E2EChecksumProtectedObjectType.PRODUCER_STATE);
                } else if (obj instanceof ByteBuffer) {
                    putBuf(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.PRODUCER_STATE), objectMetadata, (ByteBuffer) obj);
                }
            }
        }
        if (tierSegmentUpload.txnIdxOpt().isPresent()) {
            atomicReference.set(ObjectType.TRANSACTION_INDEX);
            putBuf(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.TRANSACTION_INDEX), objectMetadata, tierSegmentUpload.txnIdxOpt().get());
        }
        if (tierSegmentUpload.epochStateOpt().isPresent()) {
            atomicReference.set(ObjectType.EPOCH_STATE);
            putBuf(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.EPOCH_STATE), objectMetadata, tierSegmentUpload.epochStateOpt().get());
        }
    }

    private void putSegmentAsCombinedObject(TierSegmentUpload<?> tierSegmentUpload) throws IOException {
        Map<String, String> objectMetadata = tierSegmentUpload.objectMetadata().objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        CombinedObjectStream makeCombinedObjectStream = tierSegmentUpload.makeCombinedObjectStream(131073);
        Throwable th = null;
        try {
            try {
                putInputStream(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.SEGMENT_WITH_METADATA), objectMetadata, makeCombinedObjectStream, makeCombinedObjectStream.length(), getCrcForCombinedObject(tierSegmentUpload));
                if (makeCombinedObjectStream != null) {
                    if (0 == 0) {
                        makeCombinedObjectStream.close();
                        return;
                    }
                    try {
                        makeCombinedObjectStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (makeCombinedObjectStream != null) {
                if (th != null) {
                    try {
                        makeCombinedObjectStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    makeCombinedObjectStream.close();
                }
            }
            throw th4;
        }
    }

    private void putInputStream(String str, Map<String, String> map, InputStream inputStream, long j, Optional<String> optional) {
        if (!inputStream.markSupported()) {
            log.warn("InputStream does not support mark/reset, wrapping in BufferedInputStream");
            inputStream = new BufferedInputStream(inputStream, 131073);
        }
        com.amazonaws.services.s3.model.ObjectMetadata objectMetadata = objectMetadata(map);
        objectMetadata.setContentLength(j);
        optional.ifPresent(str2 -> {
            objectMetadata.setHeader(CRC32C_HEADER, str2);
        });
        PutObjectRequest putObjectRequest = new PutObjectRequest(this.bucket, str, inputStream, objectMetadata);
        setKmsParams(putObjectRequest);
        this.client.putObject(putObjectRequest);
    }

    private List<CompletableFuture<PutObjectResponse>> putSegmentAsMultiObjectAsync(TierSegmentUpload<?> tierSegmentUpload, AtomicReference<ObjectType> atomicReference) {
        ArrayList arrayList = new ArrayList();
        Map<String, String> objectMetadata = tierSegmentUpload.objectMetadata().objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        atomicReference.set(ObjectType.SEGMENT);
        arrayList.add(putAsync(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.SEGMENT), objectMetadata, tierSegmentUpload.segment(), E2EChecksumProtectedObjectType.SEGMENT, tierSegmentUpload.throttlerOpt()));
        atomicReference.set(ObjectType.OFFSET_INDEX);
        arrayList.add(putAsync(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.OFFSET_INDEX), objectMetadata, tierSegmentUpload.offsetIdx(), E2EChecksumProtectedObjectType.OFFSET_INDEX, tierSegmentUpload.throttlerOpt()));
        atomicReference.set(ObjectType.TIMESTAMP_INDEX);
        arrayList.add(putAsync(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.TIMESTAMP_INDEX), objectMetadata, tierSegmentUpload.timestampIdx(), E2EChecksumProtectedObjectType.TIMESTAMP_INDEX, tierSegmentUpload.throttlerOpt()));
        if (tierSegmentUpload.producerStateSnapshotOpt().isPresent()) {
            atomicReference.set(ObjectType.PRODUCER_STATE);
            arrayList.add(putAsync(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.PRODUCER_STATE), objectMetadata, tierSegmentUpload.producerStateSnapshotOpt().get(), E2EChecksumProtectedObjectType.PRODUCER_STATE, tierSegmentUpload.throttlerOpt()));
        }
        if (tierSegmentUpload.txnIdxOpt().isPresent()) {
            atomicReference.set(ObjectType.TRANSACTION_INDEX);
            arrayList.add(putBufAsync(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.TRANSACTION_INDEX), objectMetadata, tierSegmentUpload.txnIdxOpt().get()));
        }
        if (tierSegmentUpload.epochStateOpt().isPresent()) {
            atomicReference.set(ObjectType.EPOCH_STATE);
            arrayList.add(putBufAsync(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.EPOCH_STATE), objectMetadata, tierSegmentUpload.epochStateOpt().get()));
        }
        return arrayList;
    }

    private List<CompletableFuture<PutObjectResponse>> putSegmentAsCombinedObjectAsync(TierSegmentUpload<?> tierSegmentUpload) throws IOException {
        ArrayList arrayList = new ArrayList();
        Map<String, String> objectMetadata = tierSegmentUpload.objectMetadata().objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        CombinedObjectStream makeCombinedObjectStream = tierSegmentUpload.makeCombinedObjectStream(131073);
        Throwable th = null;
        try {
            try {
                arrayList.add(putInputStreamAsync(keyPath(tierSegmentUpload.objectMetadata(), ObjectType.SEGMENT_WITH_METADATA), objectMetadata, makeCombinedObjectStream, makeCombinedObjectStream.length(), getCrcForCombinedObject(tierSegmentUpload)));
                if (makeCombinedObjectStream != null) {
                    if (0 != 0) {
                        try {
                            makeCombinedObjectStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        makeCombinedObjectStream.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (makeCombinedObjectStream != null) {
                if (th != null) {
                    try {
                        makeCombinedObjectStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    makeCombinedObjectStream.close();
                }
            }
            throw th3;
        }
    }

    protected CompletableFuture<PutObjectResponse> putInputStreamAsync(String str, Map<String, String> map, InputStream inputStream, long j, Optional<String> optional) {
        checkAsyncClientPresent("putInputStream");
        CompletableFuture<PutObjectResponse> completableFuture = new CompletableFuture<>();
        HashMap hashMap = new HashMap(map);
        optional.ifPresent(str2 -> {
        });
        PutObjectRequest.Builder metadata = software.amazon.awssdk.services.s3.model.PutObjectRequest.builder().bucket(this.bucket).key(str).metadata(hashMap);
        if (!inputStream.markSupported()) {
            log.warn("InputStream does not support mark/reset, wrapping in BufferedInputStream");
            inputStream = new BufferedInputStream(inputStream, 131073);
        }
        setSseAlgorithmAndKmsParamsV2(metadata);
        log.debug("Uploading InputStream to s3://{}/{}", this.bucket, str);
        this.asyncClientOpt.get().putObject((software.amazon.awssdk.services.s3.model.PutObjectRequest) metadata.build(), AsyncRequestBody.fromInputStream(inputStream, Long.valueOf(j), getExecutor().get())).handle((putObjectResponse, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th.getCause());
                return null;
            }
            completableFuture.complete(putObjectResponse);
            return null;
        });
        return completableFuture;
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putSegment(TierSegmentUpload<?> tierSegmentUpload) {
        AtomicReference atomicReference = new AtomicReference();
        if (this.v2Enabled) {
            try {
                putSegmentAsync(tierSegmentUpload).get();
                return;
            } catch (InterruptedException | CancellationException e) {
                throw new TierObjectStoreFatalException("Unknown exception while putting segment", e.getCause() != null ? e.getCause() : e);
            } catch (ExecutionException e2) {
                handleV2Exception(e2);
            }
        }
        try {
            checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                switch (AnonymousClass1.$SwitchMap$org$apache$kafka$server$config$SegmentMetadataLayoutPutMode[tierSegmentUpload.putMode().ordinal()]) {
                    case 1:
                        putSegmentAsMultiObject(tierSegmentUpload, atomicReference);
                        return null;
                    case 2:
                        atomicReference.set(ObjectType.SEGMENT_WITH_METADATA);
                        putSegmentAsCombinedObject(tierSegmentUpload);
                        return null;
                    default:
                        throw new UnsupportedOperationException("Unsupported segment PUT mode: " + tierSegmentUpload.putMode());
                }
            });
            Optional<E2EChecksumStore> optional = this.checksumStoreOpt;
            tierSegmentUpload.getClass();
            optional.ifPresent(tierSegmentUpload::postPutSegmentCleanup);
        } catch (Exception e3) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment: " + tierSegmentUpload.objectMetadata(), e3);
        } catch (AmazonClientException e4) {
            Optional<File> currentPutObjectFile = TierObjectStoreUtils.getCurrentPutObjectFile(tierSegmentUpload, ((ObjectType) atomicReference.get()).toE2EChecksumProtectedObjectType());
            if (!currentPutObjectFile.isPresent()) {
                throw new TierObjectStoreRetriableException("Failed to upload segment: " + tierSegmentUpload.objectMetadata(), e4);
            }
            handleAmazonClientException(e4, tierSegmentUpload.objectMetadata(), (ObjectType) atomicReference.get(), currentPutObjectFile.get());
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public CompletableFuture<Void> putSegmentAsync(TierSegmentUpload<?> tierSegmentUpload) {
        List<CompletableFuture<PutObjectResponse>> putSegmentAsCombinedObjectAsync;
        checkAsyncClientPresent("putSegment");
        AtomicReference<ObjectType> atomicReference = new AtomicReference<>();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            switch (AnonymousClass1.$SwitchMap$org$apache$kafka$server$config$SegmentMetadataLayoutPutMode[tierSegmentUpload.putMode().ordinal()]) {
                case 1:
                    putSegmentAsCombinedObjectAsync = putSegmentAsMultiObjectAsync(tierSegmentUpload, atomicReference);
                    break;
                case 2:
                    atomicReference.set(ObjectType.SEGMENT_WITH_METADATA);
                    putSegmentAsCombinedObjectAsync = putSegmentAsCombinedObjectAsync(tierSegmentUpload);
                    break;
                default:
                    throw new UnsupportedOperationException("Unsupported segment PUT mode " + tierSegmentUpload.putMode());
            }
            CompletableFuture.allOf((CompletableFuture[]) putSegmentAsCombinedObjectAsync.toArray(new CompletableFuture[0])).thenApply(r7 -> {
                Optional<E2EChecksumStore> optional = this.checksumStoreOpt;
                tierSegmentUpload.getClass();
                optional.ifPresent(tierSegmentUpload::postPutSegmentCleanup);
                completableFuture.complete(null);
                return null;
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                Optional<File> currentPutObjectFile = TierObjectStoreUtils.getCurrentPutObjectFile(tierSegmentUpload, ((ObjectType) atomicReference.get()).toE2EChecksumProtectedObjectType());
                if (currentPutObjectFile.isPresent()) {
                    completableFuture.completeExceptionally(convertOperationException(th.getCause(), TierObjectStoreAction.PUT_SEGMENT.action(), Optional.of(tierSegmentUpload.objectMetadata()), currentPutObjectFile, Optional.of(atomicReference.get()), Optional.empty(), Optional.empty()));
                    return null;
                }
                completableFuture.completeExceptionally(new TierObjectStoreRetriableException("Failed to upload segment: " + tierSegmentUpload.objectMetadata(), th));
                return null;
            });
        } catch (Exception e) {
            completableFuture.completeExceptionally(new TierObjectStoreFatalException("Unknown exception when uploading segment: " + tierSegmentUpload.objectMetadata(), e));
        }
        return completableFuture;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> CompletableFuture<PutObjectResponse> putAsync(String str, Map<String, String> map, T t, E2EChecksumProtectedObjectType e2EChecksumProtectedObjectType, Optional<Throttler> optional) {
        return optional.isPresent() ? putFileWithThrottlingAsync(str, map, (File) t, e2EChecksumProtectedObjectType, optional.get()) : t instanceof File ? putFileAsync(str, map, (File) t, e2EChecksumProtectedObjectType) : putBufAsync(str, map, (ByteBuffer) t);
    }

    private <T> T checkExpiredCredentialsExceptionAndTryRefresh(Callable<T> callable) throws Exception {
        try {
            T call = callable.call();
            this.credentialsRefreshRetries.set(0);
            return call;
        } catch (AmazonS3Exception e) {
            if (ERROR_CODE_EXPIRED_TOKEN.equals(e.getErrorCode())) {
                tryRefreshCredentials(e);
            } else {
                this.credentialsRefreshRetries.set(0);
            }
            throw e;
        }
    }

    private void tryRefreshCredentials(AmazonS3Exception amazonS3Exception) {
        try {
            if (this.credentialsRefreshRetries.getAndIncrement() >= 5) {
                log.warn("Hit maximum number of consecutive credential refresh attempts without seeing a successful request, skipping refresh attempt.");
            } else {
                this.credentialsProvider.ifPresent(aWSCredentialsProvider -> {
                    log.info("S3 credentials expired; attempting a credentials provider refresh.", amazonS3Exception);
                    aWSCredentialsProvider.refresh();
                });
            }
        } catch (Exception e) {
            log.error("Received exception while attempting to refresh S3 credentials.", e);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void deleteSegment(ObjectMetadata objectMetadata) {
        if (this.v2Enabled) {
            try {
                deleteSegmentAsync(objectMetadata).get();
                return;
            } catch (InterruptedException | CancellationException e) {
                throw new TierObjectStoreFatalException("Unknown exception while deleting segment", e.getCause() != null ? e.getCause() : e);
            } catch (ExecutionException e2) {
                handleV2Exception(e2);
            }
        }
        DeleteObjectsRequest withKeys = new DeleteObjectsRequest(this.bucket).withKeys(keysForSegment(objectMetadata));
        try {
            checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                return this.client.deleteObjects(withKeys);
            });
        } catch (AmazonClientException e3) {
            throw new TierObjectStoreRetriableException("Failed to delete segment: " + objectMetadata, e3);
        } catch (Exception e4) {
            throw new TierObjectStoreFatalException("Unknown exception when deleting segment: " + objectMetadata, e4);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public CompletableFuture<Void> deleteSegmentAsync(ObjectMetadata objectMetadata) {
        checkAsyncClientPresent("deleteSegment");
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            this.asyncClientOpt.get().deleteObjects((software.amazon.awssdk.services.s3.model.DeleteObjectsRequest) software.amazon.awssdk.services.s3.model.DeleteObjectsRequest.builder().bucket(this.bucket).delete((Delete) Delete.builder().objects(keysForSegmentV2(objectMetadata)).build()).build()).handle((deleteObjectsResponse, th) -> {
                if (th == null) {
                    completableFuture.complete(null);
                    return null;
                }
                completableFuture.completeExceptionally(convertOperationException(th.getCause(), TierObjectStoreAction.DELETE_SEGMENT.action(), Optional.of(objectMetadata), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()));
                return null;
            });
        } catch (Exception e) {
            completableFuture.completeExceptionally(new TierObjectStoreFatalException("Unknown exception when deleting segment: " + objectMetadata, e));
        }
        return completableFuture;
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectAttribute objectExists(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType) throws TierObjectStoreRetriableException {
        if (this.v2Enabled) {
            try {
                return objectExistsAsync(objectStoreMetadata, objectType).get();
            } catch (InterruptedException | CancellationException e) {
                throw new TierObjectStoreFatalException("Unknown exception when checking objects", e.getCause() != null ? e.getCause() : e);
            } catch (ExecutionException e2) {
                handleV2Exception(e2);
            }
        }
        TierObjectAttribute tierObjectAttribute = new TierObjectAttribute(false);
        try {
            checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                String keyPath = keyPath(objectStoreMetadata, objectType);
                com.amazonaws.services.s3.model.ObjectMetadata objectMetadata = this.client.getObjectMetadata(this.bucket, keyPath);
                log.trace("objectExists at s3://{}/{} with metadata {}", new Object[]{this.bucket, keyPath, objectMetadata});
                tierObjectAttribute.exist = true;
                tierObjectAttribute.size = objectMetadata.getContentLength();
                return null;
            });
        } catch (AmazonClientException e3) {
            throw new TierObjectStoreRetriableException("Failed to check object existence: " + objectStoreMetadata + " type: " + objectType, e3);
        } catch (Exception e4) {
            throw new TierObjectStoreFatalException("Unknown exception when checking object existence: " + objectStoreMetadata + " type: " + objectType, e4);
        } catch (AmazonServiceException e5) {
            if (e5.getStatusCode() != 404) {
                throw new TierObjectStoreRetriableException("Failed to check object existence: " + objectStoreMetadata + " type: " + objectType, e5);
            }
            tierObjectAttribute.exist = false;
        }
        return tierObjectAttribute;
    }

    @Override // kafka.tier.store.TierObjectStore
    public CompletableFuture<TierObjectAttribute> objectExistsAsync(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType) {
        CompletableFuture<TierObjectAttribute> completableFuture = new CompletableFuture<>();
        try {
            if (this.asyncClientOpt.isPresent()) {
                String keyPath = keyPath(objectStoreMetadata, objectType);
                this.asyncClientOpt.get().getObjectAttributes((GetObjectAttributesRequest) GetObjectAttributesRequest.builder().bucket(this.bucket).key(keyPath).objectAttributes(new ObjectAttributes[]{ObjectAttributes.OBJECT_SIZE}).build()).handle((getObjectAttributesResponse, th) -> {
                    if (th == null) {
                        log.trace("objectExists (async) at s3://{}/{} with metadata {}", new Object[]{this.bucket, keyPath, getObjectAttributesResponse});
                        completableFuture.complete(new TierObjectAttribute(true, getObjectAttributesResponse.objectSize().longValue()));
                        return null;
                    }
                    Throwable cause = th.getCause() != null ? th.getCause() : th;
                    if (cause instanceof NoSuchKeyException) {
                        completableFuture.complete(new TierObjectAttribute(false));
                        return null;
                    }
                    if (cause instanceof AwsServiceException) {
                        if (((AwsServiceException) cause).statusCode() == 404) {
                            completableFuture.complete(new TierObjectAttribute(false));
                            return null;
                        }
                        completableFuture.completeExceptionally(new TierObjectStoreRetriableException("Failed to check object existence: " + objectStoreMetadata + " type: " + objectType, cause));
                        return null;
                    }
                    if (cause instanceof RetryableException) {
                        completableFuture.completeExceptionally(new TierObjectStoreRetriableException("Failed to check object existence: " + objectStoreMetadata + " type: " + objectType, cause));
                        return null;
                    }
                    completableFuture.completeExceptionally(new TierObjectStoreFatalException("Unknown exception when checking object existence: " + objectStoreMetadata + " type: " + objectType, cause));
                    return null;
                });
            } else {
                log.warn("Fallback to objectExists because async S3 client isn't created, metadata: {}, type: {}", objectStoreMetadata, objectType);
                completableFuture.complete(objectExists(objectStoreMetadata, objectType));
            }
        } catch (Exception e) {
            log.error("Failed to send objectExistsAsync request, metadata: {}, type: {}", new Object[]{objectStoreMetadata, objectType, e});
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // kafka.tier.store.TierObjectStore
    public void deleteVersions(List<TierObjectStore.KeyAndVersion> list) {
        if (this.v2Enabled) {
            try {
                deleteVersionsAsync(list).get();
                return;
            } catch (InterruptedException | CancellationException e) {
                throw new TierObjectStoreFatalException("Unknown exception when deleting versions", e.getCause() != null ? e.getCause() : e);
            } catch (ExecutionException e2) {
                handleV2Exception(e2);
            }
        }
        ArrayList arrayList = new ArrayList();
        for (TierObjectStore.KeyAndVersion keyAndVersion : list) {
            DeleteObjectsRequest.KeyVersion keyVersion = keyAndVersion.versionId() == null ? new DeleteObjectsRequest.KeyVersion(keyAndVersion.key()) : new DeleteObjectsRequest.KeyVersion(keyAndVersion.key(), keyAndVersion.versionId());
            log.debug("Deleting object {} {}", keyVersion.getKey(), keyVersion.getVersion());
            arrayList.add(keyVersion);
            if (arrayList.size() >= 1000) {
                makeDeleteObjectsCall(arrayList);
                arrayList.clear();
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        makeDeleteObjectsCall(arrayList);
    }

    @Override // kafka.tier.store.TierObjectStore
    public CompletableFuture<Void> deleteVersionsAsync(List<TierObjectStore.KeyAndVersion> list) {
        checkAsyncClientPresent("deleteVersions");
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (TierObjectStore.KeyAndVersion keyAndVersion : list) {
                ObjectIdentifier objectIdentifier = keyAndVersion.versionId() == null ? (ObjectIdentifier) ObjectIdentifier.builder().key(keyAndVersion.key()).build() : (ObjectIdentifier) ObjectIdentifier.builder().key(keyAndVersion.key()).versionId(keyAndVersion.versionId()).build();
                log.debug("Deleting object {} {}", objectIdentifier.key(), objectIdentifier.versionId());
                arrayList.add(objectIdentifier);
                if (arrayList.size() >= 1000) {
                    arrayList2.add(makeDeleteObjectsCallAsync(arrayList));
                    arrayList.clear();
                }
            }
            if (!arrayList.isEmpty()) {
                arrayList2.add(makeDeleteObjectsCallAsync(arrayList));
            }
            CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(new CompletableFuture[0])).whenComplete((r4, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    completableFuture.complete(null);
                }
            });
        } catch (Exception e) {
            log.error("Failed to send async delete request, keys: {}", list, e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private void makeDeleteObjectsCall(List<DeleteObjectsRequest.KeyVersion> list) {
        DeleteObjectsRequest withKeys = new DeleteObjectsRequest(this.bucket).withKeys(list);
        try {
            checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                log.debug("Sending a batch delete request");
                return this.client.deleteObjects(withKeys);
            });
        } catch (MultiObjectDeleteException e) {
            log.error("S3 reported errors while deleting the following versioned objects:");
            e.getErrors().forEach(deleteError -> {
                log.error("Blob Key: " + deleteError.getKey() + " Blob VersionId: " + deleteError.getVersionId() + " Error Code: " + deleteError.getCode() + " Error Message: " + deleteError.getMessage());
            });
            throw new TierObjectStoreRetriableException("Failed to delete " + e.getErrors().size() + " versioned objects", e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when deleting versioned objects", e2);
        } catch (SdkClientException e3) {
            log.error(Arrays.toString(e3.getStackTrace()));
            throw new TierObjectStoreRetriableException("Failed to delete versioned objects", e3);
        }
    }

    private CompletableFuture<Void> makeDeleteObjectsCallAsync(List<ObjectIdentifier> list) {
        software.amazon.awssdk.services.s3.model.DeleteObjectsRequest deleteObjectsRequest = (software.amazon.awssdk.services.s3.model.DeleteObjectsRequest) software.amazon.awssdk.services.s3.model.DeleteObjectsRequest.builder().bucket(this.bucket).delete((Delete) Delete.builder().objects(list).build()).build();
        log.debug("Sending a batch delete request");
        return this.asyncClientOpt.get().deleteObjects(deleteObjectsRequest).handle((deleteObjectsResponse, th) -> {
            if (th != null) {
                throw convertOperationException(th.getCause(), TierObjectStoreAction.DELETE_VERSIONED_OBJECTS.action(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
            }
            return null;
        });
    }

    @Override // kafka.tier.store.TierObjectStore
    public void close() {
        super.close();
        this.client.shutdown();
        this.asyncClientOpt.ifPresent(s3AsyncClient -> {
            s3AsyncClient.close();
        });
        this.executorOpt.ifPresent(executorService -> {
            executorService.shutdown();
        });
    }

    private String keyPath(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType) {
        return TierObjectStoreUtils.keyPath(this.prefix, objectStoreMetadata, objectType);
    }

    private String fullKeyPath(String str) {
        return "s3://" + this.bucket + TierRecoveryUploadMetadata.OBJECT_PATH_DELIMITER + str;
    }

    private com.amazonaws.services.s3.model.ObjectMetadata objectMetadata(Map<String, String> map) {
        com.amazonaws.services.s3.model.ObjectMetadata objectMetadata = new com.amazonaws.services.s3.model.ObjectMetadata();
        if (this.sseAlgorithm != null) {
            objectMetadata.setSSEAlgorithm(this.sseAlgorithm);
        }
        if (map != null) {
            objectMetadata.setUserMetadata(map);
        }
        return objectMetadata;
    }

    private void setSseAlgorithmAndKmsParamsV2(PutObjectRequest.Builder builder) {
        if (usesKms()) {
            builder.serverSideEncryption(ServerSideEncryption.AWS_KMS).ssekmsKeyId(this.sseCustomerEncryptionKey);
        }
        if (this.sseAlgorithm != null) {
            builder.serverSideEncryption(this.sseAlgorithm);
        }
    }

    private void checkAsyncClientPresent(String str) {
        if (!this.asyncClientOpt.isPresent()) {
            throw new TierObjectStoreFatalException(String.format("Failed to send async %s request because async S3 client isn't created", str));
        }
    }

    private void setKmsParams(com.amazonaws.services.s3.model.PutObjectRequest putObjectRequest) {
        if (usesKms()) {
            putObjectRequest.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(this.sseCustomerEncryptionKey));
        }
    }

    private void putFile(String str, Map<String, String> map, File file, E2EChecksumProtectedObjectType e2EChecksumProtectedObjectType) {
        com.amazonaws.services.s3.model.ObjectMetadata objectMetadata = objectMetadata(map);
        Optional<String> crcAndSetHeader = getCrcAndSetHeader(e2EChecksumProtectedObjectType, file, map, objectMetadata);
        com.amazonaws.services.s3.model.PutObjectRequest withMetadata = new com.amazonaws.services.s3.model.PutObjectRequest(this.bucket, str, file).withMetadata(objectMetadata);
        setKmsParams(withMetadata);
        if (crcAndSetHeader.isPresent()) {
            log.debug("Uploading object to s3://{}/{} with crc {}", new Object[]{this.bucket, str, crcAndSetHeader.get()});
        } else {
            log.debug("Uploading object to s3://{}/{}", this.bucket, str);
        }
        this.client.putObject(withMetadata);
    }

    private CompletableFuture<PutObjectResponse> putFileAsync(String str, Map<String, String> map, File file, E2EChecksumProtectedObjectType e2EChecksumProtectedObjectType) {
        CompletableFuture<PutObjectResponse> completableFuture = new CompletableFuture<>();
        try {
            checkAsyncClientPresent("putFile");
            Optional<String> crcAndSetHeaderV2 = getCrcAndSetHeaderV2(e2EChecksumProtectedObjectType, file, map);
            PutObjectRequest.Builder metadata = software.amazon.awssdk.services.s3.model.PutObjectRequest.builder().bucket(this.bucket).key(str).metadata(map);
            setSseAlgorithmAndKmsParamsV2(metadata);
            log.debug("Uploading object to s3://{}/{} with crc {}", new Object[]{this.bucket, str, crcAndSetHeaderV2.orElse(null)});
            this.asyncClientOpt.get().putObject((software.amazon.awssdk.services.s3.model.PutObjectRequest) metadata.build(), AsyncRequestBody.fromFile(file)).handle((putObjectResponse, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th.getCause());
                    return null;
                }
                completableFuture.complete(putObjectResponse);
                return null;
            });
        } catch (Exception e) {
            log.error("Failed to send putFileAsync request, key: {}, metadata: {}, type: {}, file: {}", new Object[]{str, map, e2EChecksumProtectedObjectType, file.getPath()});
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private void putFileWithThrottling(String str, Map<String, String> map, File file, E2EChecksumProtectedObjectType e2EChecksumProtectedObjectType, Throttler throttler) throws IOException {
        com.amazonaws.services.s3.model.ObjectMetadata objectMetadata = objectMetadata(map);
        Optional<String> crcAndSetHeader = getCrcAndSetHeader(e2EChecksumProtectedObjectType, file, map, objectMetadata);
        ThrottledFileInputStream throttledFileInputStream = new ThrottledFileInputStream(file, throttler);
        Throwable th = null;
        try {
            try {
                objectMetadata.setContentLength(file.length());
                com.amazonaws.services.s3.model.PutObjectRequest putObjectRequest = new com.amazonaws.services.s3.model.PutObjectRequest(this.bucket, str, throttledFileInputStream, objectMetadata);
                setKmsParams(putObjectRequest);
                if (crcAndSetHeader.isPresent()) {
                    log.debug("Uploading object to s3 with throttling://{}/{} with crc {}", new Object[]{this.bucket, str, crcAndSetHeader.get()});
                } else {
                    log.debug("Uploading object to s3 with throttling://{}/{}", this.bucket, str);
                }
                this.client.putObject(putObjectRequest);
                if (throttledFileInputStream != null) {
                    if (0 == 0) {
                        throttledFileInputStream.close();
                        return;
                    }
                    try {
                        throttledFileInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (throttledFileInputStream != null) {
                if (th != null) {
                    try {
                        throttledFileInputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    throttledFileInputStream.close();
                }
            }
            throw th4;
        }
    }

    private Optional<ExecutorService> getExecutor() {
        if (!this.executorOpt.isPresent()) {
            this.executorOpt = Optional.of(Executors.newSingleThreadExecutor());
        }
        return this.executorOpt;
    }

    private CompletableFuture<PutObjectResponse> putFileWithThrottlingAsync(String str, Map<String, String> map, File file, E2EChecksumProtectedObjectType e2EChecksumProtectedObjectType, Throttler throttler) {
        ThrottledFileInputStream throttledFileInputStream;
        Throwable th;
        checkAsyncClientPresent("putFileWithThrottling");
        CompletableFuture<PutObjectResponse> completableFuture = new CompletableFuture<>();
        Optional<String> crcAndSetHeaderV2 = getCrcAndSetHeaderV2(e2EChecksumProtectedObjectType, file, map);
        PutObjectRequest.Builder metadata = software.amazon.awssdk.services.s3.model.PutObjectRequest.builder().bucket(this.bucket).key(str).metadata(map);
        setSseAlgorithmAndKmsParamsV2(metadata);
        try {
            throttledFileInputStream = new ThrottledFileInputStream(file, throttler);
            th = null;
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
        }
        try {
            try {
                log.debug("Uploading object to s3 with throttling://{}/{} with crc {}", new Object[]{this.bucket, str, crcAndSetHeaderV2.orElse(null)});
                this.asyncClientOpt.get().putObject((software.amazon.awssdk.services.s3.model.PutObjectRequest) metadata.build(), AsyncRequestBody.fromInputStream(throttledFileInputStream, Long.valueOf(file.length()), getExecutor().get())).handle((putObjectResponse, th2) -> {
                    if (th2 != null) {
                        completableFuture.completeExceptionally(th2.getCause());
                        return null;
                    }
                    completableFuture.complete(putObjectResponse);
                    return null;
                });
                if (throttledFileInputStream != null) {
                    if (0 != 0) {
                        try {
                            throttledFileInputStream.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        throttledFileInputStream.close();
                    }
                }
                return completableFuture;
            } finally {
            }
        } finally {
        }
    }

    private Optional<String> getCrcForCombinedObject(TierSegmentUpload<?> tierSegmentUpload) {
        return this.checksumStoreOpt.flatMap(e2EChecksumStore -> {
            return !e2EChecksumStore.checksumProtectionEnabled(E2EChecksumProtectedObjectType.SEGMENT_WITH_METADATA) ? Optional.empty() : Optional.of(tierSegmentUpload.getChecksumForCombinedObject());
        });
    }

    private Optional<String> getCrc(E2EChecksumProtectedObjectType e2EChecksumProtectedObjectType, File file, Map<String, String> map) {
        if (!this.checksumStoreOpt.isPresent()) {
            return Optional.empty();
        }
        E2EChecksumStore e2EChecksumStore = this.checksumStoreOpt.get();
        Optional<String> empty = Optional.empty();
        if (e2EChecksumStore.checksumProtectionEnabled(e2EChecksumProtectedObjectType)) {
            empty = e2EChecksumProtectedObjectType.shouldCalculateBeforeUpload() ? E2EChecksumUtils.compute32BitBase64Crc32c(file) : E2EChecksumUtils.getBase64CrcFromStore(e2EChecksumStore, file, map);
        }
        return empty;
    }

    private Optional<String> getCrcAndSetHeader(E2EChecksumProtectedObjectType e2EChecksumProtectedObjectType, File file, Map<String, String> map, com.amazonaws.services.s3.model.ObjectMetadata objectMetadata) {
        Optional<String> crc = getCrc(e2EChecksumProtectedObjectType, file, map);
        crc.ifPresent(str -> {
            objectMetadata.setHeader(CRC32C_HEADER, str);
        });
        return crc;
    }

    private Optional<String> getCrcAndSetHeaderV2(E2EChecksumProtectedObjectType e2EChecksumProtectedObjectType, File file, Map<String, String> map) {
        Optional<String> crc = getCrc(e2EChecksumProtectedObjectType, file, map);
        if (crc.isPresent()) {
            map.put(CRC32C_HEADER, crc.get());
        } else {
            map.remove(CRC32C_HEADER);
        }
        return crc;
    }

    public void putBuf(String str, Map<String, String> map, ByteBuffer byteBuffer) {
        com.amazonaws.services.s3.model.ObjectMetadata objectMetadata = objectMetadata(map);
        objectMetadata.setContentLength(byteBuffer.limit() - byteBuffer.position());
        objectMetadata.setHeader(CRC32C_HEADER, E2EChecksumUtils.compute32BitBase64Crc32c(byteBuffer));
        com.amazonaws.services.s3.model.PutObjectRequest putObjectRequest = new com.amazonaws.services.s3.model.PutObjectRequest(this.bucket, str, new ByteBufferInputStream(byteBuffer.duplicate()), objectMetadata);
        setKmsParams(putObjectRequest);
        log.debug("Uploading buffer to s3://{}/{}", this.bucket, str);
        this.client.putObject(putObjectRequest);
    }

    public CompletableFuture<PutObjectResponse> putBufAsync(String str, Map<String, String> map, ByteBuffer byteBuffer) {
        checkAsyncClientPresent("putBuf");
        CompletableFuture<PutObjectResponse> completableFuture = new CompletableFuture<>();
        HashMap hashMap = new HashMap(map);
        hashMap.put(CRC32C_HEADER, E2EChecksumUtils.compute32BitBase64Crc32c(byteBuffer));
        PutObjectRequest.Builder metadata = software.amazon.awssdk.services.s3.model.PutObjectRequest.builder().bucket(this.bucket).key(str).metadata(hashMap);
        setSseAlgorithmAndKmsParamsV2(metadata);
        log.debug("Uploading buffer to s3://{}/{}", this.bucket, str);
        this.asyncClientOpt.get().putObject((software.amazon.awssdk.services.s3.model.PutObjectRequest) metadata.build(), AsyncRequestBody.fromByteBuffer(byteBuffer.duplicate())).handle((putObjectResponse, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th.getCause());
                return null;
            }
            completableFuture.complete(putObjectResponse);
            return null;
        });
        return completableFuture;
    }

    private boolean usesKms() {
        return this.sseCustomerEncryptionKey != null && this.sseAlgorithm.equals("aws:kms");
    }

    private List<DeleteObjectsRequest.KeyVersion> keysForSegment(ObjectMetadata objectMetadata) {
        if (objectMetadata.isCombinedObject(this.prefix)) {
            return Collections.singletonList(new DeleteObjectsRequest.KeyVersion(keyPath(objectMetadata, ObjectType.SEGMENT_WITH_METADATA)));
        }
        ArrayList arrayList = new ArrayList();
        for (ObjectType objectType : TierObjectStore.getObjectTypesPerSegment()) {
            switch (objectType) {
                case TRANSACTION_INDEX:
                    if (objectMetadata.hasAbortedTxns()) {
                        arrayList.add(new DeleteObjectsRequest.KeyVersion(keyPath(objectMetadata, objectType)));
                        break;
                    } else {
                        break;
                    }
                case EPOCH_STATE:
                    if (objectMetadata.hasEpochState()) {
                        arrayList.add(new DeleteObjectsRequest.KeyVersion(keyPath(objectMetadata, objectType)));
                        break;
                    } else {
                        break;
                    }
                case PRODUCER_STATE:
                    if (objectMetadata.hasProducerState()) {
                        arrayList.add(new DeleteObjectsRequest.KeyVersion(keyPath(objectMetadata, objectType)));
                        break;
                    } else {
                        break;
                    }
                default:
                    arrayList.add(new DeleteObjectsRequest.KeyVersion(keyPath(objectMetadata, objectType)));
                    break;
            }
        }
        return arrayList;
    }

    private List<ObjectIdentifier> keysForSegmentV2(ObjectMetadata objectMetadata) {
        if (objectMetadata.isCombinedObject(this.prefix)) {
            return Collections.singletonList(ObjectIdentifier.builder().key(keyPath(objectMetadata, ObjectType.SEGMENT_WITH_METADATA)).build());
        }
        ArrayList arrayList = new ArrayList();
        for (ObjectType objectType : TierObjectStore.getObjectTypesPerSegment()) {
            switch (objectType) {
                case TRANSACTION_INDEX:
                    if (objectMetadata.hasAbortedTxns()) {
                        arrayList.add(ObjectIdentifier.builder().key(keyPath(objectMetadata, objectType)).build());
                        break;
                    } else {
                        break;
                    }
                case EPOCH_STATE:
                    if (objectMetadata.hasEpochState()) {
                        arrayList.add(ObjectIdentifier.builder().key(keyPath(objectMetadata, objectType)).build());
                        break;
                    } else {
                        break;
                    }
                case PRODUCER_STATE:
                    if (objectMetadata.hasProducerState()) {
                        arrayList.add(ObjectIdentifier.builder().key(keyPath(objectMetadata, objectType)).build());
                        break;
                    } else {
                        break;
                    }
                default:
                    arrayList.add(ObjectIdentifier.builder().key(keyPath(objectMetadata, objectType)).build());
                    break;
            }
        }
        return arrayList;
    }

    @Override // kafka.tier.store.TierObjectStore
    public BucketHealthResult checkBucketHealth() {
        if (this.v2Enabled) {
            return checkBucketHealthV2();
        }
        try {
            return (BucketHealthResult) checkExpiredCredentialsExceptionAndTryRefresh(() -> {
                ByteBuffer timeHealthPayload = TierObjectStoreUtils.timeHealthPayload();
                HealthMetadata healthMetadata = new HealthMetadata(this.clusterIdOpt, this.brokerIdOpt);
                String objectPath = healthMetadata.toFragmentLocation(this.prefix, FragmentType.HEALTH_CHECK).get().objectPath();
                putBuf(objectPath, healthMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt), timeHealthPayload);
                InputStream inputStream = getObjectStoreFragment(healthMetadata, FragmentType.HEALTH_CHECK).getInputStream();
                Throwable th = null;
                while (true) {
                    try {
                        try {
                            int read = inputStream.read();
                            if (read <= 0) {
                                break;
                            }
                            log.trace("Bucket probe read {} bytes", Integer.valueOf(read));
                        } finally {
                        }
                    } catch (Throwable th2) {
                        if (inputStream != null) {
                            if (th != null) {
                                try {
                                    inputStream.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                inputStream.close();
                            }
                        }
                        throw th2;
                    }
                }
                if (inputStream != null) {
                    if (0 != 0) {
                        try {
                            inputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        inputStream.close();
                    }
                }
                this.client.deleteObject(this.bucket, objectPath);
                return BucketHealthResult.HEALTHY;
            });
        } catch (Exception e) {
            log.error("Bucket health checker returned unclassified error", e);
            return BucketHealthResult.UNCLASSIFIED;
        } catch (AmazonServiceException e2) {
            if (e2.getStatusCode() == 400 && e2.getErrorCode() != null && e2.getErrorCode().startsWith("KMS.")) {
                log.error("Bucket health checker resulted in a BYOK related error with error code: {}, status code: {}", new Object[]{e2.getErrorCode(), Integer.valueOf(e2.getStatusCode()), e2});
                return BucketHealthResult.BYOK;
            }
            if (e2.getStatusCode() == 403 && Objects.equals(e2.getErrorCode(), "AccessDenied")) {
                log.error("Bucket health checker resulted in a permission error for customer key: {}", usesKms() ? "not enabled" : this.sseCustomerEncryptionKey, e2);
                return usesKms() ? BucketHealthResult.BYOK : BucketHealthResult.PERMISSION;
            }
            log.error("Bucket health checker returned an unclassified error for status code: {} error code: {}", new Object[]{Integer.valueOf(e2.getStatusCode()), e2.getErrorCode(), e2});
            return BucketHealthResult.UNCLASSIFIED;
        }
    }

    public BucketHealthResult checkBucketHealthV2() {
        AwsServiceException awsServiceException;
        try {
            checkAsyncClientPresent("checkBucketHealth");
            ByteBuffer timeHealthPayload = TierObjectStoreUtils.timeHealthPayload();
            HealthMetadata healthMetadata = new HealthMetadata(this.clusterIdOpt, this.brokerIdOpt);
            String objectPath = healthMetadata.toFragmentLocation(this.prefix, FragmentType.HEALTH_CHECK).get().objectPath();
            putBufAsync(objectPath, healthMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt), timeHealthPayload).get();
            InputStream inputStream = getObjectStoreFragmentAsync(healthMetadata, FragmentType.HEALTH_CHECK).get().getInputStream();
            Throwable th = null;
            while (true) {
                try {
                    try {
                        int read = inputStream.read();
                        if (read <= 0) {
                            break;
                        }
                        log.trace("Bucket probe read {} bytes", Integer.valueOf(read));
                    } finally {
                    }
                } finally {
                }
            }
            if (inputStream != null) {
                if (0 != 0) {
                    try {
                        inputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    inputStream.close();
                }
            }
            this.asyncClientOpt.get().deleteObjects((software.amazon.awssdk.services.s3.model.DeleteObjectsRequest) software.amazon.awssdk.services.s3.model.DeleteObjectsRequest.builder().bucket(this.bucket).delete((Delete) Delete.builder().objects(new ObjectIdentifier[]{(ObjectIdentifier) ObjectIdentifier.builder().key(objectPath).build()}).build()).build()).get();
            return BucketHealthResult.HEALTHY;
        } catch (Exception th3) {
            if (th3 instanceof CompletionException) {
                awsServiceException = th3.getCause();
            }
            AwsServiceException awsServiceException2 = awsServiceException;
            if (!(awsServiceException2 instanceof AwsServiceException)) {
                log.error("Bucket health checker returned unclassified error", awsServiceException2);
                return BucketHealthResult.UNCLASSIFIED;
            }
            AwsServiceException awsServiceException3 = awsServiceException2;
            if (awsServiceException3.statusCode() == 400 && awsServiceException3.awsErrorDetails().errorCode() != null && awsServiceException3.awsErrorDetails().errorCode().startsWith("KMS.")) {
                log.error("Bucket health checker resulted in a BYOK related error with error code: {}, status code: {}", new Object[]{awsServiceException3.awsErrorDetails().errorCode(), Integer.valueOf(awsServiceException3.statusCode()), awsServiceException3});
                return BucketHealthResult.BYOK;
            }
            if (awsServiceException3.statusCode() == 403 && Objects.equals(awsServiceException3.awsErrorDetails().errorCode(), "AccessDenied")) {
                log.error("Bucket health checker resulted in a permission error for customer key: {}", usesKms() ? "not enabled" : this.sseCustomerEncryptionKey, awsServiceException3);
                return usesKms() ? BucketHealthResult.BYOK : BucketHealthResult.PERMISSION;
            }
            log.error("Bucket health checker returned an unclassified error for status code: {} error code: {}", new Object[]{Integer.valueOf(awsServiceException3.statusCode()), awsServiceException3.awsErrorDetails().errorCode(), awsServiceException3});
            return BucketHealthResult.UNCLASSIFIED;
        }
    }

    public static String validateAndGetS3RegionName(String str) {
        try {
            return Regions.fromName(str).getName();
        } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("Configured " + KafkaConfig.TierS3RegionProp() + " '" + str + "' is not known");
        }
    }

    public static AmazonS3ClientAndCredentialsProvider client(S3TierObjectStoreConfig s3TierObjectStoreConfig) throws TierObjectStoreFatalException {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setUserAgentPrefix(s3TierObjectStoreConfig.s3UserAgentPrefix);
        SSLConnectionSocketFactory sSLConnectionSocketFactory = getSSLConnectionSocketFactory(s3TierObjectStoreConfig);
        if (sSLConnectionSocketFactory != null) {
            clientConfiguration.getApacheHttpClientConfig().setSslSocketFactory(sSLConnectionSocketFactory);
        }
        AmazonS3ClientBuilder standard = AmazonS3ClientBuilder.standard();
        standard.setClientConfiguration(clientConfiguration);
        if (s3TierObjectStoreConfig.s3ForcePathStyleAccess.booleanValue()) {
            standard.setPathStyleAccessEnabled(true);
        }
        if (s3TierObjectStoreConfig.s3SignerOverride.isPresent() && !s3TierObjectStoreConfig.s3SignerOverride.get().isEmpty()) {
            clientConfiguration.setSignerOverride(s3TierObjectStoreConfig.s3SignerOverride.get());
        }
        if (s3TierObjectStoreConfig.s3EndpointOverride.isPresent() && !s3TierObjectStoreConfig.s3EndpointOverride.get().isEmpty()) {
            standard.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(s3TierObjectStoreConfig.s3EndpointOverride.get(), validateAndGetS3RegionName(s3TierObjectStoreConfig.s3Region)));
        } else if (s3TierObjectStoreConfig.s3Region != null && !s3TierObjectStoreConfig.s3Region.isEmpty()) {
            standard.setRegion(s3TierObjectStoreConfig.s3Region);
        }
        log.debug("AWS_METADATA_SERVICE_TIMEOUT is {} seconds", System.getenv("AWS_METADATA_SERVICE_TIMEOUT"));
        AWSCredentialsProvider aWSCredentialsProvider = (AWSCredentialsProvider) s3TierObjectStoreConfig.s3CredFilePath.map(PropertiesFileCredentialsProvider::new).orElse(new DefaultAWSCredentialsProviderChain());
        if (s3TierObjectStoreConfig.assumeRoleArn.isPresent()) {
            AWSSecurityTokenServiceClientBuilder builder = AWSSecurityTokenServiceClient.builder();
            builder.setCredentials(aWSCredentialsProvider);
            if (s3TierObjectStoreConfig.s3EndpointOverride.isPresent() && !s3TierObjectStoreConfig.s3EndpointOverride.get().isEmpty()) {
                builder.setRegion(validateAndGetS3RegionName(s3TierObjectStoreConfig.s3Region));
            } else if (s3TierObjectStoreConfig.s3Region != null && !s3TierObjectStoreConfig.s3Region.isEmpty()) {
                builder.setRegion(s3TierObjectStoreConfig.s3Region);
            }
            aWSCredentialsProvider = new STSAssumeRoleSessionCredentialsProvider.Builder(s3TierObjectStoreConfig.assumeRoleArn.get(), "tiered-storage").withStsClient((AWSSecurityTokenService) builder.build()).build();
        }
        standard.setCredentials(aWSCredentialsProvider);
        return new AmazonS3ClientAndCredentialsProvider((AmazonS3) standard.build(), buildAsyncClient(s3TierObjectStoreConfig, sSLConnectionSocketFactory), aWSCredentialsProvider);
    }

    private static Optional<S3AsyncClient> buildAsyncClient(S3TierObjectStoreConfig s3TierObjectStoreConfig, SSLConnectionSocketFactory sSLConnectionSocketFactory) {
        if (sSLConnectionSocketFactory != null) {
            log.error("Skip building S3 async client: sslConnectionSocketFactory is specified");
            return Optional.empty();
        }
        if (s3TierObjectStoreConfig.s3SignerOverride.isPresent() && !s3TierObjectStoreConfig.s3SignerOverride.get().isEmpty()) {
            log.error("Skip building S3 async client: s3SignerOverride is specified");
            return Optional.empty();
        }
        if (s3TierObjectStoreConfig.s3CredFilePath.isPresent() && !s3TierObjectStoreConfig.s3CredFilePath.get().isEmpty()) {
            log.error("Skip building S3 async client: s3CredFilePath is specified");
            return Optional.empty();
        }
        ClientOverrideConfiguration.Builder builder = ClientOverrideConfiguration.builder();
        builder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, s3TierObjectStoreConfig.s3UserAgentPrefix);
        S3AsyncClientBuilder builder2 = S3AsyncClient.builder();
        builder2.overrideConfiguration((ClientOverrideConfiguration) builder.build());
        builder2.forcePathStyle(s3TierObjectStoreConfig.s3ForcePathStyleAccess);
        if (s3TierObjectStoreConfig.s3EndpointOverride.isPresent() && !s3TierObjectStoreConfig.s3EndpointOverride.get().isEmpty()) {
            String validateAndGetS3RegionName = validateAndGetS3RegionName(s3TierObjectStoreConfig.s3Region);
            builder2.endpointOverride(URI.create(s3TierObjectStoreConfig.s3EndpointOverride.get()));
            builder2.region(Region.of(validateAndGetS3RegionName));
        } else if (s3TierObjectStoreConfig.s3Region != null && !s3TierObjectStoreConfig.s3Region.isEmpty()) {
            builder2.region(Region.of(s3TierObjectStoreConfig.s3Region));
        }
        log.debug("AWS_METADATA_SERVICE_TIMEOUT is {} seconds", System.getenv("AWS_METADATA_SERVICE_TIMEOUT"));
        AwsCredentialsProvider build = DefaultCredentialsProvider.builder().build();
        if (s3TierObjectStoreConfig.assumeRoleArn.isPresent() && !s3TierObjectStoreConfig.assumeRoleArn.get().isEmpty()) {
            build = StsAssumeRoleCredentialsProvider.builder().stsClient((StsClient) StsClient.builder().credentialsProvider(build).build()).refreshRequest((AssumeRoleRequest) AssumeRoleRequest.builder().roleArn(s3TierObjectStoreConfig.assumeRoleArn.get()).roleSessionName("tiered-storage").build()).build();
        }
        builder2.credentialsProvider(build);
        return Optional.of(builder2.build());
    }

    private void expectBucket(String str, String str2, Optional<String> optional) throws TierObjectStoreFatalException {
        try {
            String bucketLocation = this.client.getBucketLocation(str);
            if (bucketLocation.equals("US") && str2.equals("us-east-1")) {
                return;
            }
            if (!str2.equals(bucketLocation)) {
                log.warn("Bucket region {} does not match expected region {}", bucketLocation, str2);
            }
        } catch (AmazonClientException e) {
            if (!optional.isPresent() || optional.get().isEmpty()) {
                throw new TierObjectStoreFatalException("Failed to validate that bucket location for " + str + " matches location " + str2 + "; unable to call GetBucketLocation", e);
            }
            log.warn("On-prem store does not implement S3 API's GetBucketLocation. Skipping check which ensures that actual bucket region matches expected region.");
        }
    }

    private static SSLConnectionSocketFactory getSSLConnectionSocketFactory(S3TierObjectStoreConfig s3TierObjectStoreConfig) throws TierObjectStoreFatalException {
        if (s3TierObjectStoreConfig.s3SecurityProviders.isPresent() && !s3TierObjectStoreConfig.s3SecurityProviders.get().trim().isEmpty()) {
            HashMap hashMap = new HashMap();
            hashMap.put("security.providers", s3TierObjectStoreConfig.s3SecurityProviders.get());
            SecurityUtils.addConfiguredSecurityProviders(hashMap);
        }
        SSLConnectionSocketFactory sSLConnectionSocketFactory = null;
        boolean z = s3TierObjectStoreConfig.s3SslTrustStoreLocation.isPresent() && !s3TierObjectStoreConfig.s3SslTrustStoreLocation.get().isEmpty();
        boolean z2 = s3TierObjectStoreConfig.s3SslKeyStoreLocation.isPresent() && !s3TierObjectStoreConfig.s3SslKeyStoreLocation.get().isEmpty();
        if (z || z2) {
            try {
                SSLContextBuilder custom = SSLContexts.custom();
                if (z) {
                    KeyStore keyStore = KeyStore.getInstance(s3TierObjectStoreConfig.s3SslTrustStoreType.get());
                    keyStore.load(new FileInputStream(s3TierObjectStoreConfig.s3SslTrustStoreLocation.get()), s3TierObjectStoreConfig.s3SslTrustStorePassword.get().value().toCharArray());
                    custom.loadTrustMaterial(keyStore, (TrustStrategy) null);
                }
                if (z2) {
                    KeyStore keyStore2 = KeyStore.getInstance(s3TierObjectStoreConfig.s3SslKeyStoreType.get());
                    keyStore2.load(new FileInputStream(s3TierObjectStoreConfig.s3SslKeyStoreLocation.get()), s3TierObjectStoreConfig.s3SslKeyStorePassword.get().value().toCharArray());
                    custom.loadKeyMaterial(keyStore2, s3TierObjectStoreConfig.s3SslKeyPassword.get().value().toCharArray(), (map, socket) -> {
                        return "confluent.kafka";
                    });
                }
                custom.setProtocol(s3TierObjectStoreConfig.s3SslProtocol);
                if (s3TierObjectStoreConfig.s3SslProvider.isPresent() && !s3TierObjectStoreConfig.s3SslProvider.get().trim().isEmpty()) {
                    Optional<String> optional = s3TierObjectStoreConfig.s3SslProvider;
                    custom.getClass();
                    optional.ifPresent(custom::setProvider);
                }
                sSLConnectionSocketFactory = new SSLConnectionSocketFactory(custom.build(), (String[]) s3TierObjectStoreConfig.s3SslEnabledProtocols.toArray(new String[0]), (String[]) null, new DefaultHostnameVerifier());
            } catch (Exception e) {
                throw new TierObjectStoreFatalException("Failed to load keystore or trust store for tiered object store", e);
            }
        }
        return sSLConnectionSocketFactory;
    }

    private void handleAmazonClientException(AmazonClientException amazonClientException, ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, File file) {
        if (objectType != ObjectType.SEGMENT_WITH_METADATA && (amazonClientException instanceof AmazonS3Exception) && ERROR_CODE_BAD_DIGEST.equals(((AmazonS3Exception) amazonClientException).getErrorCode())) {
            validateChecksumOnPutSegmentFailure(objectStoreMetadata, objectType, file, amazonClientException);
        }
        throw new TierObjectStoreRetriableException("Failed to upload segment: " + objectStoreMetadata + " due to file: " + file, amazonClientException);
    }

    private void handleS3ExceptionDuringSegmentUpload(S3Exception s3Exception, ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, File file) {
        if (objectType != ObjectType.SEGMENT_WITH_METADATA) {
            validateChecksumOnPutSegmentFailure(objectStoreMetadata, objectType, file, s3Exception);
        }
        throw new TierObjectStoreRetriableException("Failed to upload segment: " + objectStoreMetadata + " due to file: " + file, s3Exception);
    }

    private void validateChecksumOnPutSegmentFailure(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, File file, Throwable th) {
        Map<String, String> objectMetadata = objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        String absolutePath = file.getAbsolutePath();
        Optional empty = Optional.empty();
        String keyPath = keyPath(objectStoreMetadata, objectType);
        if (this.checksumStoreOpt.isPresent() && this.checksumStoreOpt.get().checksumProtectionEnabled(objectType.toE2EChecksumProtectedObjectType())) {
            empty = E2EChecksumUtils.getBase64CrcFromStore(this.checksumStoreOpt.get(), file, objectMetadata);
        }
        if (empty.isPresent()) {
            Optional compute32BitBase64Crc32c = E2EChecksumUtils.compute32BitBase64Crc32c(file);
            if (!empty.equals(compute32BitBase64Crc32c)) {
                throw new E2EChecksumInvalidException("Failed to upload object due to an on-disk corruption of file: " + absolutePath, th);
            }
            log.info("Network Error: On-network corruption of a file during upload: {} with expected CRC value: {} and recalculated CRC value: {}", new Object[]{absolutePath, empty.get(), compute32BitBase64Crc32c.get()});
            throw new TierObjectStoreRetriableException(String.format("Failed to upload object, due to on-network corruption, to %s with metadata %s, file %s, type %s", keyPath, objectMetadata, file, objectType), th);
        }
    }
}
