package kafka.tier.store;

import com.azure.core.util.polling.LongRunningOperationStatus;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.options.BlobBeginCopyOptions;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.AzureBlockBlobTierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import kafka.utils.CoreUtils;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/store/AzureBlockBlobTierObjectStore.class */
public class AzureBlockBlobTierObjectStore implements TierObjectStore {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AzureBlockBlobTierObjectStore.class);
    private final BlobServiceClient blobServiceClient;
    private final BlobContainerClient blobContainerClient;
    private final Optional<String> clusterIdOpt;
    private final Optional<Integer> brokerIdOpt;
    private final String container;
    private final String prefix;
    private final int drainThreshold;

    /* loaded from: input_file:kafka/tier/store/AzureBlockBlobTierObjectStore$AzureBlockBlobTierObjectStoreResponse.class */
    private static class AzureBlockBlobTierObjectStoreResponse implements TierObjectStoreResponse {
        private final InputStream inputStream;

        AzureBlockBlobTierObjectStoreResponse(InputStream inputStream, int i, long j) {
            this.inputStream = new AutoAbortingGenericInputStream(inputStream, i, j);
        }

        @Override // kafka.tier.store.TierObjectStoreResponse, java.lang.AutoCloseable
        public void close() throws IOException {
            this.inputStream.close();
        }

        @Override // kafka.tier.store.TierObjectStoreResponse
        public InputStream getInputStream() {
            return this.inputStream;
        }
    }

    public AzureBlockBlobTierObjectStore(AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig) {
        this.clusterIdOpt = azureBlockBlobTierObjectStoreConfig.clusterIdOpt;
        this.brokerIdOpt = azureBlockBlobTierObjectStoreConfig.brokerIdOpt;
        this.container = azureBlockBlobTierObjectStoreConfig.container;
        this.prefix = azureBlockBlobTierObjectStoreConfig.azureBlobPrefix;
        this.drainThreshold = azureBlockBlobTierObjectStoreConfig.drainThreshold;
        this.blobServiceClient = createServiceClient(azureBlockBlobTierObjectStoreConfig);
        this.blobContainerClient = createContainerClient(this.blobServiceClient, azureBlockBlobTierObjectStoreConfig);
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStore.Backend getBackend() {
        return TierObjectStore.Backend.AzureBlockBlob;
    }

    @Override // kafka.tier.store.TierObjectStore
    public Map<String, List<VersionInformation>> listObject(String str, boolean z) {
        HashMap hashMap = new HashMap();
        try {
            int i = 0;
            Iterator<BlobItem> it = this.blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(str).setDetails(new BlobListDetails().setRetrieveVersions(z)), null).iterator();
            while (it.hasNext()) {
                BlobItem next = it.next();
                i++;
                hashMap.putIfAbsent(next.getName(), new ArrayList());
                if (z) {
                    ((List) hashMap.get(next.getName())).add(new VersionInformation(next.getVersionId()));
                }
                log.debug("Azure listObjects: " + next.getName() + " VersionId: " + hashMap.get(next.getName()));
            }
            log.debug("Azure listObjects versions: " + z + " for keyPrefix: " + str + " # of blobs returned: " + i);
            return hashMap;
        } catch (BlobStorageException | UncheckedIOException e) {
            throw new TierObjectStoreRetriableException(String.format("Failed to list objects with keyPrefix: %s, getVersionInfo: %b", str, Boolean.valueOf(z)), e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException(String.format("Failed to list objects with keyPrefix: %s, getVersionInfo: %b", str, Boolean.valueOf(z)), e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStoreResponse getObject(TierObjectStore.ObjectStoreMetadata objectStoreMetadata, TierObjectStore.FileType fileType, Integer num, Integer num2) {
        String keyPath = keyPath(objectStoreMetadata, fileType);
        BlobClient blobClient = this.blobContainerClient.getBlobClient(keyPath);
        if (num != null && num2 != null && num.intValue() > num2.intValue()) {
            throw new IllegalStateException("Invalid range of byteOffsetStart and byteOffsetEnd");
        }
        if (num == null && num2 != null) {
            throw new IllegalStateException("Cannot specify a byteOffsetEnd without specifying a byteOffsetStart");
        }
        log.debug("Fetching object from {}/{}, with range of {} to {}", this.container, keyPath, num, num2);
        long longValue = num == null ? 0L : num.longValue();
        return new AzureBlockBlobTierObjectStoreResponse(blobClient.getBlockBlobClient().openInputStream(num2 != null ? new BlobRange(longValue, Long.valueOf(num2.longValue() - longValue)) : new BlobRange(longValue), new BlobRequestConditions()), this.drainThreshold, num2 == null ? Long.MAX_VALUE : num2.longValue() - longValue);
    }

    @Override // kafka.tier.store.TierObjectStore
    public TierObjectStore.OpaqueData prepPutSegment() throws TierObjectStoreRetriableException, IOException {
        return TierObjectStore.OpaqueData.ZEROED;
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putSegment(TierObjectStore.ObjectMetadata objectMetadata, File file, File file2, File file3, Optional<File> optional, Optional<ByteBuffer> optional2, Optional<ByteBuffer> optional3) {
        Map<String, String> objectMetadata2 = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.SEGMENT), objectMetadata2, file);
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.OFFSET_INDEX), objectMetadata2, file2);
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.TIMESTAMP_INDEX), objectMetadata2, file3);
            optional.ifPresent(file4 -> {
                putFile(keyPath(objectMetadata, TierObjectStore.FileType.PRODUCER_STATE), objectMetadata2, file4);
            });
            optional2.ifPresent(byteBuffer -> {
                putBuf(keyPath(objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX), objectMetadata2, byteBuffer);
            });
            optional3.ifPresent(byteBuffer2 -> {
                putBuf(keyPath(objectMetadata, TierObjectStore.FileType.EPOCH_STATE), objectMetadata2, byteBuffer2);
            });
        } catch (BlobStorageException | UncheckedIOException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segment " + objectMetadata, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment: " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putInMemorySegment(TierObjectStore.ObjectMetadata objectMetadata, File file, File file2, File file3, Optional<ByteBuffer> optional, Optional<ByteBuffer> optional2, Optional<ByteBuffer> optional3) {
        Map<String, String> objectMetadata2 = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.SEGMENT), objectMetadata2, file);
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.OFFSET_INDEX), objectMetadata2, file2);
            putFile(keyPath(objectMetadata, TierObjectStore.FileType.TIMESTAMP_INDEX), objectMetadata2, file3);
            optional.ifPresent(byteBuffer -> {
                putBuf(keyPath(objectMetadata, TierObjectStore.FileType.PRODUCER_STATE), objectMetadata2, byteBuffer);
            });
            optional2.ifPresent(byteBuffer2 -> {
                putBuf(keyPath(objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX), objectMetadata2, byteBuffer2);
            });
            optional3.ifPresent(byteBuffer3 -> {
                putBuf(keyPath(objectMetadata, TierObjectStore.FileType.EPOCH_STATE), objectMetadata2, byteBuffer3);
            });
        } catch (UncheckedIOException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segmtent " + objectMetadata, e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment: " + objectMetadata, e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putObject(TierObjectStore.ObjectStoreMetadata objectStoreMetadata, File file, TierObjectStore.FileType fileType) {
        try {
            putFile(keyPath(objectStoreMetadata, fileType), objectStoreMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt), file);
        } catch (UncheckedIOException e) {
            throw new TierObjectStoreRetriableException(String.format("Failed to upload object %s, file %s, type %s", objectStoreMetadata, file, fileType), e);
        } catch (Exception e2) {
            throw new TierObjectStoreFatalException(String.format("Failed to upload object %s, file %s, type %s", objectStoreMetadata, file, fileType), e2);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void restoreObjectByCopy(TierObjectStore.ObjectMetadata objectMetadata, String str, VersionInformation versionInformation) {
        String versionId = versionInformation.getVersionId();
        Map<String, String> objectMetadata2 = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            String blobUrl = this.blobContainerClient.getBlobClient(str).getVersionClient(versionId).getBlobUrl();
            log.debug(String.format("Azure restore key: %s lastLiveVersionId: %s copySource: %s", str, versionId, blobUrl));
            log.info(String.format("Azure restore key: %s response status: %s", str, this.blobContainerClient.getBlobClient(str).beginCopy(new BlobBeginCopyOptions(blobUrl).setMetadata(objectMetadata2).setTier(AccessTier.HOT)).waitUntil(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED).getStatus()));
        } catch (BlobStorageException e) {
            if (!e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
                throw new TierObjectStoreRetriableException(String.format("Failed to restore object %s (version: %s)", str, versionId), e);
            }
            throw new TierObjectStoreFatalException(String.format("Failed to restore object %s (version: %s) as blob not found", str, versionId), e);
        } catch (UncheckedIOException e2) {
            throw new TierObjectStoreRetriableException(String.format("Failed to restore object %s (version: %s)", str, versionId), e2);
        } catch (Exception e3) {
            throw new TierObjectStoreFatalException(String.format("Unknown exception when restoring object %s (version: %s)", str, versionId), e3);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void deleteSegment(TierObjectStore.ObjectMetadata objectMetadata) {
        for (TierObjectStore.FileType fileType : TierObjectStore.FileType.values()) {
            String keyPath = keyPath(objectMetadata, fileType);
            try {
                BlobClient blobClient = this.blobContainerClient.getBlobClient(keyPath);
                log.debug("Deleting " + keyPath);
                blobClient.delete();
            } catch (BlobStorageException e) {
                if (!e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
                    throw new TierObjectStoreRetriableException("Failed to delete file " + keyPath, e);
                }
            } catch (Exception e2) {
                throw new TierObjectStoreFatalException("Unknown exception when deleting segment " + objectMetadata, e2);
            }
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void deleteVersions(List<TierObjectStore.KeyAndVersion> list) {
    }

    @Override // kafka.tier.store.TierObjectStore
    public boolean objectExists(TierObjectStore.ObjectMetadata objectMetadata, TierObjectStore.FileType fileType) throws IOException, TierObjectStoreRetriableException {
        String keyPath = keyPath(objectMetadata, fileType);
        try {
            return this.blobContainerClient.getBlobClient(keyPath).exists().booleanValue();
        } catch (Exception e) {
            throw new TierObjectStoreRetriableException("Failed to check for object existence with metadata " + objectMetadata + " @ " + keyPath);
        }
    }

    @Override // kafka.tier.store.TierObjectStore
    public void close() {
    }

    private void putFile(String str, Map<String, String> map, File file) {
        this.blobContainerClient.getBlobClient(str).uploadFromFile(file.getPath(), new ParallelTransferOptions(null, null, null), new BlobHttpHeaders(), map, AccessTier.HOT, new BlobRequestConditions(), null);
    }

    @Override // kafka.tier.store.TierObjectStore
    public void putBuf(String str, Map<String, String> map, ByteBuffer byteBuffer) {
        BlobClient blobClient = this.blobContainerClient.getBlobClient(str);
        byte[] md5hash = CoreUtils.md5hash(byteBuffer);
        blobClient.getBlockBlobClient().uploadWithResponse(new BufferedInputStream(new ByteBufferInputStream(byteBuffer.duplicate())), byteBuffer.limit() - byteBuffer.position(), new BlobHttpHeaders().setContentMd5(md5hash), map, AccessTier.HOT, md5hash, new BlobRequestConditions(), null, null);
    }

    private static BlobServiceClient createServiceClient(AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig) {
        BlobServiceClient buildClient;
        if (azureBlockBlobTierObjectStoreConfig.azureCredentialsConfig.isPresent()) {
            AzureBlockBlobTierObjectStoreConfig.AzureCredentialsConfig azureCredentialsConfig = azureBlockBlobTierObjectStoreConfig.azureCredentialsConfig.get();
            if (azureCredentialsConfig.connectionStringAuthMethod().booleanValue()) {
                buildClient = new BlobServiceClientBuilder().connectionString(azureCredentialsConfig.connectionString()).buildClient();
            } else {
                buildClient = new BlobServiceClientBuilder().endpoint(azureBlockBlobTierObjectStoreConfig.endpoint.get()).credential(new ClientSecretCredentialBuilder().clientId(azureCredentialsConfig.azureClientId()).tenantId(azureCredentialsConfig.azureTenantId()).clientSecret(azureCredentialsConfig.azureClientSecret()).build()).buildClient();
            }
        } else {
            buildClient = new BlobServiceClientBuilder().endpoint(azureBlockBlobTierObjectStoreConfig.endpoint.get()).credential(new DefaultAzureCredentialBuilder().build()).buildClient();
        }
        return buildClient;
    }

    private static BlobContainerClient createContainerClient(BlobServiceClient blobServiceClient, AzureBlockBlobTierObjectStoreConfig azureBlockBlobTierObjectStoreConfig) {
        BlobContainerClient blobContainerClient = blobServiceClient.getBlobContainerClient(azureBlockBlobTierObjectStoreConfig.container);
        if (blobContainerClient.exists()) {
            return blobContainerClient;
        }
        throw new TierObjectStoreFatalException("Container " + azureBlockBlobTierObjectStoreConfig.container + " does not exist or could not be found");
    }

    private String keyPath(TierObjectStore.ObjectStoreMetadata objectStoreMetadata, TierObjectStore.FileType fileType) {
        return objectStoreMetadata.toPath(this.prefix, fileType);
    }
}
