package kafka.restore.schedulers;

import java.io.File;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import kafka.restore.RestoreMetricsManager;
import kafka.restore.messages.CopyObjectInStoreRequest;
import kafka.restore.messages.CopyObjectInStoreResponse;
import kafka.restore.messages.ListObjectsInStoreRequest;
import kafka.restore.messages.ListObjectsInStoreResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.ObjectStoreRequest;
import kafka.restore.messages.RestoreObjectsInStoreRequest;
import kafka.restore.messages.UploadFtpsToStoreRequest;
import kafka.restore.messages.UploadFtpsToStoreResponse;
import kafka.restore.operators.OperatorUtil;
import kafka.tier.TopicIdPartition;
import kafka.tier.state.ChecksumUtils;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.Header;
import kafka.tier.store.TierObjectStore;
import kafka.utils.checksum.Algorithm;
import kafka.utils.checksum.CheckedFileIO;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/schedulers/ObjectStorePoolImpl.class */
public class ObjectStorePoolImpl implements ObjectStorePool {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ObjectStorePoolImpl.class);
    private final AsyncServiceSchedulerResultsReceiver resultsReceiver;
    private ThreadPoolExecutor threadPool;
    private final TierObjectStore tierObjectStore;
    private CompletableFutureRetryer retries;
    private final RestoreMetricsManager metrics;
    private final Time time;
    private Duration objectStoreRetryWaitInMs = Constants.DEFAULT_OBJECT_STORE_REQUEST_WAIT_BETWEEN_IN_MS;
    private volatile ObjectPoolState state = ObjectPoolState.OFF;

    /* loaded from: input_file:kafka/restore/schedulers/ObjectStorePoolImpl$ObjectPoolState.class */
    private enum ObjectPoolState {
        RUNNING,
        OFF
    }

    public ObjectStorePoolImpl(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver, ThreadPoolExecutor threadPoolExecutor, TierObjectStore tierObjectStore, RestoreMetricsManager restoreMetricsManager, Time time) {
        this.resultsReceiver = asyncServiceSchedulerResultsReceiver;
        this.threadPool = threadPoolExecutor;
        this.tierObjectStore = tierObjectStore;
        this.metrics = restoreMetricsManager;
        this.time = time;
    }

    @Override // kafka.restore.schedulers.ObjectStorePool
    public synchronized void startUp() {
        if (this.state == ObjectPoolState.RUNNING) {
            return;
        }
        this.state = ObjectPoolState.RUNNING;
        this.retries = new CompletableFutureRetryer(this.threadPool, this.objectStoreRetryWaitInMs, this.time);
    }

    @Override // kafka.restore.schedulers.ObjectStorePool
    public synchronized void shutdown() {
        if (this.state == ObjectPoolState.OFF) {
            return;
        }
        if (this.threadPool != null) {
            this.threadPool.shutdown();
        }
        if (this.tierObjectStore != null) {
            this.tierObjectStore.close();
        }
        this.state = ObjectPoolState.OFF;
    }

    @Override // kafka.restore.schedulers.ObjectStorePool
    public void submitObjectStoreRequest(ObjectStoreRequest objectStoreRequest) {
        if (objectStoreRequest instanceof RestoreObjectsInStoreRequest) {
            submitRestoreObjectsInStoreRequest((RestoreObjectsInStoreRequest) objectStoreRequest);
        } else {
            if (!(objectStoreRequest instanceof UploadFtpsToStoreRequest)) {
                throw new UnsupportedOperationException("objectStoreRequest of type " + objectStoreRequest.getClass() + " is not of a recognizable request type.");
            }
            submitUploadFtpsToStoreRequest((UploadFtpsToStoreRequest) objectStoreRequest);
        }
    }

    protected void submitRestoreObjectsInStoreRequest(RestoreObjectsInStoreRequest restoreObjectsInStoreRequest) {
        LOGGER.info(String.format("[%s]: restoring %s segments", restoreObjectsInStoreRequest.getTopicPartition(), Integer.valueOf(restoreObjectsInStoreRequest.getSegmentStateAndPathMap().size())));
        this.threadPool.execute(() -> {
            RestoreObjectMultipleResponseHandler restoreObjectMultipleResponseHandler = new RestoreObjectMultipleResponseHandler(restoreObjectsInStoreRequest, this.resultsReceiver, this.metrics);
            restoreObjectsInStoreRequest.getSegmentStateAndPathMap().forEach((uuid, segmentStateAndPath) -> {
                Consumer<Long> consumer;
                Consumer<Long> consumer2;
                ListObjectsInStoreRequest listObjectsInStoreRequest = new ListObjectsInStoreRequest(0, restoreObjectsInStoreRequest.getTopic(), restoreObjectsInStoreRequest.getPartition(), uuid, segmentStateAndPath);
                ListVersionsOperator listVersionsOperator = new ListVersionsOperator(listObjectsInStoreRequest, restoreObjectMultipleResponseHandler, this.tierObjectStore, this.threadPool);
                CompletableFutureRetryer completableFutureRetryer = this.retries;
                listVersionsOperator.getClass();
                Supplier supplier = listVersionsOperator::listVersions;
                Predicate<Throwable> predicate = th -> {
                    return th.getClass().equals(RetryableException.class);
                };
                if (this.metrics != null) {
                    Sensor restoreListVersionsMs = this.metrics.restoreListVersionsMs();
                    restoreListVersionsMs.getClass();
                    consumer = (v1) -> {
                        r4.record(v1);
                    };
                } else {
                    consumer = null;
                }
                if (this.metrics != null) {
                    RestoreMetricsManager restoreMetricsManager = this.metrics;
                    restoreMetricsManager.getClass();
                    consumer2 = (v1) -> {
                        r5.recordRestoreListVersionsFailures(v1);
                    };
                } else {
                    consumer2 = null;
                }
                CompletableFuture withTimedRetries = completableFutureRetryer.withTimedRetries(supplier, predicate, 10, consumer, consumer2);
                withTimedRetries.exceptionally(th2 -> {
                    LOGGER.error(String.format("[%s]: listVersions call completed with Exception", restoreObjectsInStoreRequest.getTopicPartition()), th2);
                    restoreObjectMultipleResponseHandler.addReceivedResponse(new ListObjectsInStoreResponse(0, listObjectsInStoreRequest.getTopic(), listObjectsInStoreRequest.getPartition(), listObjectsInStoreRequest.getUuid(), MessageStatusCode.ILLEGAL_STATE_ERROR, MessageResult.FAILURE, uuid, null));
                    return null;
                });
                withTimedRetries.thenCompose(map -> {
                    return CompletableFuture.runAsync(() -> {
                        map.forEach((str, str2) -> {
                            Consumer<Long> consumer3;
                            Consumer<Long> consumer4;
                            LOGGER.debug(String.format("[%s]: handling: %s with version: %s", restoreObjectsInStoreRequest.getTopicPartition(), str, str2));
                            CopyObjectInStoreRequest copyObjectInStoreRequest = new CopyObjectInStoreRequest(0, restoreObjectsInStoreRequest.getTopic(), restoreObjectsInStoreRequest.getPartition(), uuid, str, str2, segmentStateAndPath);
                            CopyObjectOperator copyObjectOperator = new CopyObjectOperator(copyObjectInStoreRequest, restoreObjectMultipleResponseHandler, this.tierObjectStore, this.threadPool);
                            CompletableFutureRetryer completableFutureRetryer2 = this.retries;
                            copyObjectOperator.getClass();
                            Supplier supplier2 = copyObjectOperator::restoreObjectByCopy;
                            Predicate<Throwable> predicate2 = th3 -> {
                                return th3.getClass().equals(RetryableException.class);
                            };
                            if (this.metrics != null) {
                                Sensor restoreObjectCopyMs = this.metrics.restoreObjectCopyMs();
                                restoreObjectCopyMs.getClass();
                                consumer3 = (v1) -> {
                                    r4.record(v1);
                                };
                            } else {
                                consumer3 = null;
                            }
                            if (this.metrics != null) {
                                RestoreMetricsManager restoreMetricsManager2 = this.metrics;
                                restoreMetricsManager2.getClass();
                                consumer4 = (v1) -> {
                                    r5.recordRestoreObjectCopyFailures(v1);
                                };
                            } else {
                                consumer4 = null;
                            }
                            completableFutureRetryer2.withTimedRetries(supplier2, predicate2, 10, consumer3, consumer4).exceptionally(th4 -> {
                                LOGGER.error(String.format("[%s]: restoreObjectByCopy call completed with Exception", restoreObjectsInStoreRequest.getTopicPartition()), th4);
                                restoreObjectMultipleResponseHandler.addReceivedResponse(new CopyObjectInStoreResponse(0, copyObjectInStoreRequest.getTopic(), copyObjectInStoreRequest.getPartition(), copyObjectInStoreRequest.getUuid(), MessageStatusCode.INTERNAL_ERROR, MessageResult.FAILURE, uuid, copyObjectInStoreRequest.getObjectPath()));
                                return null;
                            });
                        });
                    }, this.threadPool);
                });
            });
        });
    }

    protected void submitUploadFtpsToStoreRequest(UploadFtpsToStoreRequest uploadFtpsToStoreRequest) {
        this.threadPool.execute(() -> {
            LOGGER.debug(String.format("[%s]: upload ftps file at: %s", uploadFtpsToStoreRequest.getTopicPartition(), uploadFtpsToStoreRequest.getFtpsFilePath()));
            File file = new File(uploadFtpsToStoreRequest.getFtpsFilePath());
            TopicPartition topicPartition = uploadFtpsToStoreRequest.getTopicPartition();
            try {
                uploadFtps(topicPartition, this.tierObjectStore, file);
                LOGGER.debug(String.format("[%s]: uploaded ftps file at: %s success, exit", uploadFtpsToStoreRequest.getTopicPartition(), uploadFtpsToStoreRequest.getFtpsFilePath()));
                reportUploadFtpsToStoreResponse(uploadFtpsToStoreRequest, MessageStatusCode.OK, MessageResult.SUCCESS);
            } catch (Exception e) {
                LOGGER.error(String.format("[%s]: Exception when uploading new ftps to object store", topicPartition), (Throwable) e);
                reportUploadFtpsToStoreResponse(uploadFtpsToStoreRequest, MessageStatusCode.INTERNAL_ERROR, MessageResult.FAILURE);
            }
        });
    }

    private static void uploadFtps(TopicPartition topicPartition, TierObjectStore tierObjectStore, File file) throws Exception {
        CheckedFileIO open = CheckedFileIO.open(file.toPath(), StandardOpenOption.READ);
        Throwable th = null;
        try {
            Algorithm tierStateFileAlgorithm = ChecksumUtils.tierStateFileAlgorithm(file.toPath());
            Optional<Header> readHeader = FileTierPartitionState.readHeader(open);
            if (!readHeader.isPresent()) {
                throw new Exception(String.format("[%s]: Header is not present for TierPartitionState being recovered", topicPartition));
            }
            Header header = readHeader.get();
            TierObjectStore.TierStateRestoreSnapshotMetadata tierStateRestoreSnapshotMetadata = new TierObjectStore.TierStateRestoreSnapshotMetadata(new TopicIdPartition(topicPartition.topic(), header.topicId(), topicPartition.partition()), header.startOffset(), header.endOffset(), OperatorUtil.computeMd5(open), tierStateFileAlgorithm);
            LOGGER.debug(String.format("[%s]: restore metadata: %s", topicPartition, tierStateRestoreSnapshotMetadata));
            tierObjectStore.putObject(tierStateRestoreSnapshotMetadata, file, TierObjectStore.FileType.TIER_STATE_SNAPSHOT);
            if (open != null) {
                if (0 == 0) {
                    open.close();
                    return;
                }
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    private void reportUploadFtpsToStoreResponse(UploadFtpsToStoreRequest uploadFtpsToStoreRequest, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        this.resultsReceiver.reportServiceSchedulerResponse(new UploadFtpsToStoreResponse(0, uploadFtpsToStoreRequest.getTopic(), uploadFtpsToStoreRequest.getPartition(), uploadFtpsToStoreRequest.getUuid(), messageStatusCode, messageResult));
    }

    public ThreadPoolExecutor threadPool() {
        return this.threadPool;
    }
}
