package kafka.restore.schedulers;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.rest.TierRecordMetadataResponse;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import kafka.restore.RestoreMetricsManager;
import kafka.restore.configmap.NodeConfig;
import kafka.restore.messages.KafkaFenceRequest;
import kafka.restore.messages.KafkaFenceResponse;
import kafka.restore.messages.KafkaFetchFtpsRequest;
import kafka.restore.messages.KafkaFetchFtpsResponse;
import kafka.restore.messages.KafkaForceRestoreRequest;
import kafka.restore.messages.KafkaForceRestoreResponse;
import kafka.restore.messages.KafkaPreConditionCheckRequest;
import kafka.restore.messages.KafkaPreConditionCheckResponse;
import kafka.restore.messages.KafkaRequest;
import kafka.restore.messages.KafkaTierPartitionStatusRequest;
import kafka.restore.messages.KafkaTierPartitionStatusResponse;
import kafka.restore.messages.KafkaUnfreezeRequest;
import kafka.restore.messages.KafkaUnfreezeResponse;
import kafka.restore.messages.KafkaValidateLogRangeRequest;
import kafka.restore.messages.KafkaValidateLogRangeResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.tracing.TraceRecordBuilderImpl;
import org.projectnessie.cel.common.types.Overloads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/schedulers/KafkaConnectionPoolImpl.class */
public class KafkaConnectionPoolImpl implements KafkaConnectionPool {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaConnectionPoolImpl.class);
    private final AsyncServiceSchedulerResultsReceiver resultsReceiver;
    private final int poolSize;
    private String ftpsDirPath;
    private ThreadPoolExecutor threadPool;
    private PoolingHttpClientConnectionManager connectionManager;
    private CloseableHttpClient httpClient;
    private KafkaConnectionPoolState state;
    private CompletableFutureRetryer retries;
    private final RestoreMetricsManager metrics;
    private final Time time;
    private int nextUuid = 0;
    private Duration statusQueryRetryWaitInMs = Constants.DEFAULT_WAIT_BETWEEN_IN_MS;
    private final ObjectMapper mapper = new ObjectMapper();

    /* loaded from: input_file:kafka/restore/schedulers/KafkaConnectionPoolImpl$KafkaConnectionPoolState.class */
    private enum KafkaConnectionPoolState {
        RUNNING,
        OFF
    }

    public void setStatusQueryRetryWaitInMs(long j) {
        this.statusQueryRetryWaitInMs = Duration.ofMillis(j);
    }

    public void setHttpClient(CloseableHttpClient closeableHttpClient) {
        this.httpClient = closeableHttpClient;
    }

    private int getNextUuid() {
        int i = this.nextUuid;
        this.nextUuid++;
        return i;
    }

    public KafkaConnectionPoolImpl(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver, int i, String str, RestoreMetricsManager restoreMetricsManager, Time time) {
        if (i < 1) {
            throw new IllegalArgumentException("KafkaConnectionPool must have a pool size of at least one");
        }
        this.resultsReceiver = asyncServiceSchedulerResultsReceiver;
        this.poolSize = i;
        this.state = KafkaConnectionPoolState.OFF;
        this.ftpsDirPath = str;
        this.metrics = restoreMetricsManager;
        this.time = time;
        Path path = Paths.get(str, new String[0]);
        if (Files.exists(path, new LinkOption[0])) {
            return;
        }
        try {
            Files.createDirectory(path, new FileAttribute[0]);
        } catch (IOException e) {
            LOGGER.warn(str + "is not exist, create it also failed, set ftpsDirPath to /tmp");
            this.ftpsDirPath = "/tmp";
        }
    }

    @Override // kafka.restore.schedulers.KafkaConnectionPool
    public void startUp() {
        if (this.state == KafkaConnectionPoolState.RUNNING) {
            return;
        }
        this.state = KafkaConnectionPoolState.RUNNING;
        this.threadPool = new ThreadPoolExecutor(this.poolSize, this.poolSize, 1L, TimeUnit.MINUTES, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());
        this.retries = new CompletableFutureRetryer(this.threadPool, this.statusQueryRetryWaitInMs, this.time);
        if (this.httpClient == null) {
            this.connectionManager = new PoolingHttpClientConnectionManager();
            this.connectionManager.setMaxTotal(this.poolSize);
            this.httpClient = SchedulerUtil.buildHttpClient(this.connectionManager);
        }
    }

    @Override // kafka.restore.schedulers.KafkaConnectionPool
    public void shutdown() {
        if (this.state == KafkaConnectionPoolState.OFF) {
            return;
        }
        if (this.threadPool != null) {
            this.threadPool.shutdown();
        }
        try {
            if (this.httpClient != null) {
                this.httpClient.close();
            }
        } catch (IOException e) {
            LOGGER.error("Received IOException while closing HTTP Client during shutdown of KafkaConnectionPool.", (Throwable) e);
        }
        if (this.connectionManager != null) {
            this.connectionManager.shutdown();
        }
    }

    @Override // kafka.restore.schedulers.KafkaConnectionPool
    public void submitKafkaRequest(KafkaRequest kafkaRequest) {
        if (this.state != KafkaConnectionPoolState.RUNNING) {
            throw new IllegalStateException("Cannot submit request to non-running connection pool.");
        }
        if (kafkaRequest instanceof KafkaFetchFtpsRequest) {
            submitFetchFtpsRequest((KafkaFetchFtpsRequest) kafkaRequest);
            return;
        }
        if ((kafkaRequest instanceof KafkaFenceRequest) || (kafkaRequest instanceof KafkaUnfreezeRequest) || (kafkaRequest instanceof KafkaForceRestoreRequest) || (kafkaRequest instanceof KafkaPreConditionCheckRequest)) {
            submitKafkaEventRequest(kafkaRequest);
        } else if (kafkaRequest instanceof KafkaTierPartitionStatusRequest) {
            submitTierPartitionStatusRequest((KafkaTierPartitionStatusRequest) kafkaRequest);
        } else {
            if (!(kafkaRequest instanceof KafkaValidateLogRangeRequest)) {
                throw new UnsupportedOperationException("kafkaRequest of type " + kafkaRequest.getClass() + " is not of a recognizable request type.");
            }
            submitKafkaValidateLogRangeRequest((KafkaValidateLogRangeRequest) kafkaRequest);
        }
    }

    protected void submitFetchFtpsRequest(KafkaFetchFtpsRequest kafkaFetchFtpsRequest) {
        this.threadPool.execute(() -> {
            Consumer<Long> consumer;
            Consumer<Long> consumer2;
            FtpsFetcher ftpsFetcher = new FtpsFetcher(this.httpClient, kafkaFetchFtpsRequest, this.ftpsDirPath, this.threadPool);
            CompletableFutureRetryer completableFutureRetryer = this.retries;
            ftpsFetcher.getClass();
            Supplier supplier = ftpsFetcher::fetchFtpsFile;
            Predicate<Throwable> predicate = th -> {
                return th.getClass().equals(RetryableException.class);
            };
            if (this.metrics != null) {
                Sensor restoreFetchFtpsMs = this.metrics.restoreFetchFtpsMs();
                restoreFetchFtpsMs.getClass();
                consumer = (v1) -> {
                    r4.record(v1);
                };
            } else {
                consumer = null;
            }
            if (this.metrics != null) {
                RestoreMetricsManager restoreMetricsManager = this.metrics;
                restoreMetricsManager.getClass();
                consumer2 = (v1) -> {
                    r5.recordRestoreFetchFtpsFailures(v1);
                };
            } else {
                consumer2 = null;
            }
            CompletableFuture withTimedRetries = completableFutureRetryer.withTimedRetries(supplier, predicate, 20, consumer, consumer2);
            withTimedRetries.thenCompose(kafkaFetchFtpsResponse -> {
                return CompletableFuture.runAsync(() -> {
                    this.resultsReceiver.reportServiceSchedulerResponse(kafkaFetchFtpsResponse);
                }, this.threadPool);
            });
            withTimedRetries.exceptionally(th2 -> {
                LOGGER.error(String.format("[%s]: fetchFtpsFile call completed with Exception", kafkaFetchFtpsRequest.getTopicPartition()), th2);
                reportFetchFtpsResponse(kafkaFetchFtpsRequest, MessageStatusCode.EXTERNAL_SERVICE_ERROR, MessageResult.FAILURE);
                return null;
            });
        });
    }

    protected void submitKafkaEventRequest(KafkaRequest kafkaRequest) {
        this.threadPool.execute(() -> {
            CloseableHttpResponse closeableHttpResponse = null;
            try {
                try {
                    HttpPost buildKafkaHttpRequest = SchedulerUtil.buildKafkaHttpRequest(kafkaRequest);
                    LOGGER.debug("Send kafka request: " + buildKafkaHttpRequest.getURI().toString());
                    CloseableHttpResponse execute = this.httpClient.execute((HttpUriRequest) buildKafkaHttpRequest);
                    if (execute.getStatusLine().getStatusCode() != 200) {
                        LOGGER.error(String.format("[%s]: http response error status: %s, http response body: %s.", kafkaRequest.getTopicPartition(), Integer.valueOf(execute.getStatusLine().getStatusCode()), execute.getEntity().toString()));
                        reportKafkaResponse(kafkaRequest, MessageStatusCode.SERVICE_CONNECTION_ERROR, MessageResult.FAILURE);
                    } else if (kafkaRequest instanceof KafkaFenceRequest) {
                        reportKafkaFenceResponse(kafkaRequest, extractTierRecordMetadataResponse(execute), MessageStatusCode.OK, MessageResult.SUCCESS);
                    } else {
                        reportKafkaResponse(kafkaRequest, MessageStatusCode.OK, MessageResult.SUCCESS);
                    }
                    if (execute != null) {
                        try {
                            execute.close();
                        } catch (IOException e) {
                            LOGGER.error(String.format("[%s]: Received IOException while closing HTTP response after replying to KafkaTierPartitionEventRequest", kafkaRequest.getTopicPartition()), (Throwable) e);
                        }
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        try {
                            closeableHttpResponse.close();
                        } catch (IOException e2) {
                            LOGGER.error(String.format("[%s]: Received IOException while closing HTTP response after replying to KafkaTierPartitionEventRequest", kafkaRequest.getTopicPartition()), (Throwable) e2);
                        }
                    }
                    throw th;
                }
            } catch (IOException e3) {
                LOGGER.error(String.format("[%s]: Received Exception while sending KafkaEventRequest %s", kafkaRequest.getTopicPartition(), null), (Throwable) e3);
                reportKafkaResponse(kafkaRequest, MessageStatusCode.EXTERNAL_SERVICE_ERROR, MessageResult.FAILURE);
                if (0 != 0) {
                    try {
                        closeableHttpResponse.close();
                    } catch (IOException e4) {
                        LOGGER.error(String.format("[%s]: Received IOException while closing HTTP response after replying to KafkaTierPartitionEventRequest", kafkaRequest.getTopicPartition()), (Throwable) e4);
                    }
                }
            } catch (URISyntaxException e5) {
                LOGGER.error(String.format("[%s]: TierPartitionEvent request parameters for request with UUID {}, resulted in invalid URI.", kafkaRequest.getTopicPartition(), Integer.valueOf(kafkaRequest.getUuid())), (Throwable) e5);
                reportKafkaResponse(kafkaRequest, MessageStatusCode.BAD_ARGUMENT_ERROR, MessageResult.FAILURE);
                if (0 != 0) {
                    try {
                        closeableHttpResponse.close();
                    } catch (IOException e6) {
                        LOGGER.error(String.format("[%s]: Received IOException while closing HTTP response after replying to KafkaTierPartitionEventRequest", kafkaRequest.getTopicPartition()), (Throwable) e6);
                    }
                }
            }
        });
    }

    protected void submitTierPartitionStatusRequest(KafkaTierPartitionStatusRequest kafkaTierPartitionStatusRequest) {
        this.threadPool.execute(() -> {
            List<NodeConfig> replicaBrokerIDs = kafkaTierPartitionStatusRequest.getReplicaBrokerIDs();
            long count = replicaBrokerIDs.stream().map(nodeConfig -> {
                PartitionStatusFetcher partitionStatusFetcher = new PartitionStatusFetcher(this.httpClient, kafkaTierPartitionStatusRequest.getTopic(), kafkaTierPartitionStatusRequest.getPartition(), nodeConfig, kafkaTierPartitionStatusRequest.getExpectedStatus(), this.threadPool);
                CompletableFutureRetryer completableFutureRetryer = this.retries;
                partitionStatusFetcher.getClass();
                return completableFutureRetryer.withRetries(partitionStatusFetcher::fetchPartitionStatus, th -> {
                    return th.getClass().equals(RetryableException.class);
                }, 20);
            }).map(completableFuture -> {
                try {
                    return (Integer) completableFuture.join();
                } catch (CompletionException e) {
                    return -1;
                }
            }).filter(num -> {
                return num.intValue() == kafkaTierPartitionStatusRequest.getExpectedStatus();
            }).count();
            if (count == replicaBrokerIDs.size()) {
                LOGGER.debug("report success");
                reportKafkaResponse(kafkaTierPartitionStatusRequest, MessageStatusCode.OK, MessageResult.SUCCESS);
            } else {
                LOGGER.error(String.format("[%s]: Check partition status failed, %s brokers status as %s but expecting %s", kafkaTierPartitionStatusRequest.getTopicPartition(), Long.valueOf(count), Integer.valueOf(kafkaTierPartitionStatusRequest.getExpectedStatus()), Integer.valueOf(replicaBrokerIDs.size())));
                reportKafkaResponse(kafkaTierPartitionStatusRequest, MessageStatusCode.EXTERNAL_SERVICE_ERROR, MessageResult.FAILURE);
            }
        });
    }

    protected void submitKafkaValidateLogRangeRequest(KafkaValidateLogRangeRequest kafkaValidateLogRangeRequest) {
        this.threadPool.execute(() -> {
            List<NodeConfig> replicaBrokers = kafkaValidateLogRangeRequest.getReplicaBrokers();
            long count = replicaBrokers.stream().map(nodeConfig -> {
                PartitionLogRangeValidator partitionLogRangeValidator = new PartitionLogRangeValidator(this.httpClient, kafkaValidateLogRangeRequest.getTopic(), kafkaValidateLogRangeRequest.getPartition(), nodeConfig, kafkaValidateLogRangeRequest.getLogStartOffset(), kafkaValidateLogRangeRequest.getLogEndOffset(), this.threadPool);
                CompletableFutureRetryer completableFutureRetryer = this.retries;
                partitionLogRangeValidator.getClass();
                return completableFutureRetryer.withRetries(partitionLogRangeValidator::validateLogRange, th -> {
                    return th.getClass().equals(RetryableException.class);
                }, 20);
            }).map(completableFuture -> {
                try {
                    return (Integer) completableFuture.join();
                } catch (CompletionException e) {
                    return -1;
                }
            }).filter(num -> {
                return num.intValue() == 1;
            }).count();
            if (count == replicaBrokers.size()) {
                LOGGER.debug("report success");
                reportKafkaResponse(kafkaValidateLogRangeRequest, MessageStatusCode.OK, MessageResult.SUCCESS);
            } else {
                LOGGER.error(String.format("[%s]: Validate log range failed, %s brokers log range match as (%s-%s) but expecting %s.", kafkaValidateLogRangeRequest.getTopicPartition(), Long.valueOf(count), Long.valueOf(kafkaValidateLogRangeRequest.getLogStartOffset()), Long.valueOf(kafkaValidateLogRangeRequest.getLogEndOffset()), Integer.valueOf(replicaBrokers.size())));
                reportKafkaResponse(kafkaValidateLogRangeRequest, MessageStatusCode.EXTERNAL_SERVICE_ERROR, MessageResult.FAILURE);
            }
        });
    }

    protected void reportFetchFtpsResponse(KafkaFetchFtpsRequest kafkaFetchFtpsRequest, MessageStatusCode messageStatusCode, MessageResult messageResult, String str) {
        this.resultsReceiver.reportServiceSchedulerResponse(new KafkaFetchFtpsResponse(getNextUuid(), kafkaFetchFtpsRequest.getTopic(), kafkaFetchFtpsRequest.getPartition(), kafkaFetchFtpsRequest.getUuid(), messageStatusCode, messageResult, str));
    }

    protected void reportFetchFtpsResponse(KafkaFetchFtpsRequest kafkaFetchFtpsRequest, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        reportFetchFtpsResponse(kafkaFetchFtpsRequest, messageStatusCode, messageResult, null);
    }

    protected void reportKafkaResponse(KafkaRequest kafkaRequest, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        if (kafkaRequest instanceof KafkaFenceRequest) {
            reportKafkaFenceResponse(kafkaRequest, null, messageStatusCode, messageResult);
            return;
        }
        if (kafkaRequest instanceof KafkaForceRestoreRequest) {
            reportKafkaForceRestoreResponse(kafkaRequest, messageStatusCode, messageResult);
            return;
        }
        if (kafkaRequest instanceof KafkaUnfreezeRequest) {
            reportKafkaUnfreezeResponse(kafkaRequest, messageStatusCode, messageResult);
            return;
        }
        if (kafkaRequest instanceof KafkaValidateLogRangeRequest) {
            reportKafkaValidateLogRangeResponse(kafkaRequest, messageStatusCode, messageResult);
            return;
        }
        if (kafkaRequest instanceof KafkaTierPartitionStatusRequest) {
            reportTierPartitionStatusResponse(kafkaRequest, messageStatusCode, messageResult);
        } else if (kafkaRequest instanceof KafkaPreConditionCheckRequest) {
            reportKafkaPreConditionCheckResponse(kafkaRequest, messageStatusCode, messageResult);
        } else {
            LOGGER.warn("Unknown kafka request type: " + kafkaRequest.toString());
        }
    }

    private void reportKafkaFenceResponse(KafkaRequest kafkaRequest, TierRecordMetadataResponse tierRecordMetadataResponse, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        this.resultsReceiver.reportServiceSchedulerResponse(new KafkaFenceResponse(getNextUuid(), kafkaRequest.getTopic(), kafkaRequest.getPartition(), kafkaRequest.getUuid(), tierRecordMetadataResponse, messageStatusCode, messageResult));
    }

    private void reportKafkaForceRestoreResponse(KafkaRequest kafkaRequest, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        this.resultsReceiver.reportServiceSchedulerResponse(new KafkaForceRestoreResponse(getNextUuid(), kafkaRequest.getTopic(), kafkaRequest.getPartition(), kafkaRequest.getUuid(), messageStatusCode, messageResult));
    }

    private void reportKafkaUnfreezeResponse(KafkaRequest kafkaRequest, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        this.resultsReceiver.reportServiceSchedulerResponse(new KafkaUnfreezeResponse(getNextUuid(), kafkaRequest.getTopic(), kafkaRequest.getPartition(), kafkaRequest.getUuid(), messageStatusCode, messageResult));
    }

    protected void reportKafkaValidateLogRangeResponse(KafkaRequest kafkaRequest, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        this.resultsReceiver.reportServiceSchedulerResponse(new KafkaValidateLogRangeResponse(getNextUuid(), kafkaRequest.getTopic(), kafkaRequest.getPartition(), kafkaRequest.getUuid(), messageStatusCode, messageResult));
    }

    protected void reportTierPartitionStatusResponse(KafkaRequest kafkaRequest, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        this.resultsReceiver.reportServiceSchedulerResponse(new KafkaTierPartitionStatusResponse(getNextUuid(), kafkaRequest.getTopic(), kafkaRequest.getPartition(), kafkaRequest.getUuid(), messageStatusCode, messageResult));
    }

    protected void reportKafkaPreConditionCheckResponse(KafkaRequest kafkaRequest, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        this.resultsReceiver.reportServiceSchedulerResponse(new KafkaPreConditionCheckResponse(getNextUuid(), kafkaRequest.getTopic(), kafkaRequest.getPartition(), kafkaRequest.getUuid(), messageStatusCode, messageResult));
    }

    private TierRecordMetadataResponse extractTierRecordMetadataResponse(CloseableHttpResponse closeableHttpResponse) throws IOException {
        JsonNode jsonNode = this.mapper.readTree(EntityUtils.toString(closeableHttpResponse.getEntity())).get("data").get("attributes");
        return new TierRecordMetadataResponse(jsonNode.get("user_topic_partition").asText(), jsonNode.get(Overloads.TypeConvertTimestamp).asLong(), jsonNode.get(TraceRecordBuilderImpl.OFFSET).asLong(), jsonNode.get("tier_partition").asInt());
    }
}
