package kafka.restore.schedulers;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.restore.messages.KafkaFetchFtpsRequest;
import kafka.restore.messages.KafkaFetchFtpsResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/schedulers/FtpsFetcher.class */
public class FtpsFetcher {
    private static final String HTTP_HEADER_CONTENT_DISPOSITION = "Content-Disposition";
    private final CloseableHttpClient httpClient;
    private final KafkaFetchFtpsRequest request;
    private final String ftpsDirPath;
    private final ThreadPoolExecutor executor;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) FtpsFetcher.class);
    private static final Pattern FTPS_FILE_PATTERN = Pattern.compile(".+filename=\"(.+?)\".*");

    public FtpsFetcher(CloseableHttpClient closeableHttpClient, KafkaFetchFtpsRequest kafkaFetchFtpsRequest, String str, ThreadPoolExecutor threadPoolExecutor) {
        this.httpClient = closeableHttpClient;
        this.request = kafkaFetchFtpsRequest;
        this.ftpsDirPath = str;
        this.executor = threadPoolExecutor;
    }

    public CompletableFuture<KafkaFetchFtpsResponse> fetchFtpsFile() {
        return CompletableFuture.supplyAsync(() -> {
            CloseableHttpResponse closeableHttpResponse = null;
            try {
                try {
                    CloseableHttpResponse execute = this.httpClient.execute((HttpUriRequest) SchedulerUtil.buildKafkaHttpRequest(this.request));
                    if (execute.getStatusLine().getStatusCode() != 200) {
                        LOGGER.error(String.format("[%s]: received non-retryable failure status code, http response error status: %s, http response body: %s.", this.request.getTopicPartition(), Integer.valueOf(execute.getStatusLine().getStatusCode()), execute.getEntity().toString()));
                        KafkaFetchFtpsResponse kafkaFetchFtpsResponse = new KafkaFetchFtpsResponse(0, this.request.getTopic(), this.request.getPartition(), this.request.getUuid(), MessageStatusCode.EXTERNAL_SERVICE_ERROR, MessageResult.FAILURE);
                        if (execute != null) {
                            try {
                                execute.close();
                            } catch (IOException e) {
                                LOGGER.error(String.format("[%s]: Received IOException while closing HTTP response after replying to FetchFtpsRequest", this.request.getTopicPartition()), (Throwable) e);
                            }
                        }
                        return kafkaFetchFtpsResponse;
                    }
                    String ftpsFileName = getFtpsFileName(this.ftpsDirPath, this.request.getTopic(), this.request.getPartition(), execute);
                    SchedulerUtil.saveHttpResponseToFile(execute, ftpsFileName);
                    LOGGER.debug("download ftps file to local success: " + ftpsFileName);
                    KafkaFetchFtpsResponse kafkaFetchFtpsResponse2 = new KafkaFetchFtpsResponse(0, this.request.getTopic(), this.request.getPartition(), this.request.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS, ftpsFileName);
                    if (execute != null) {
                        try {
                            execute.close();
                        } catch (IOException e2) {
                            LOGGER.error(String.format("[%s]: Received IOException while closing HTTP response after replying to FetchFtpsRequest", this.request.getTopicPartition()), (Throwable) e2);
                        }
                    }
                    return kafkaFetchFtpsResponse2;
                } catch (Exception e3) {
                    String format = String.format("[%s]: Exception during downloading ftps file, throw retryable exception", this.request.getTopicPartition());
                    LOGGER.error(format, (Throwable) e3);
                    throw new RetryableException(format, e3);
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    try {
                        closeableHttpResponse.close();
                    } catch (IOException e4) {
                        LOGGER.error(String.format("[%s]: Received IOException while closing HTTP response after replying to FetchFtpsRequest", this.request.getTopicPartition()), (Throwable) e4);
                    }
                }
                throw th;
            }
        }, this.executor);
    }

    public static String getFtpsFileName(String str, String str2, int i, CloseableHttpResponse closeableHttpResponse) {
        String value = closeableHttpResponse.getLastHeader("Content-Disposition").getValue();
        if (value != null) {
            Matcher matcher = FTPS_FILE_PATTERN.matcher(value);
            if (matcher.find()) {
                String group = matcher.group(1);
                return str + "/ftps-" + str2 + "-" + i + group.substring(group.indexOf(46));
            }
        }
        return str + "/ftps-" + str2 + "-" + i;
    }
}
