package kafka.restore.schedulers;

import io.cloudevents.kafka.impl.KafkaHeaders;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import kafka.restore.RestoreConfig;
import kafka.restore.configmap.NodeConfig;
import kafka.restore.messages.KafkaFenceRequest;
import kafka.restore.messages.KafkaFetchFtpsRequest;
import kafka.restore.messages.KafkaForceRestoreRequest;
import kafka.restore.messages.KafkaRequest;
import kafka.restore.messages.KafkaUnfreezeRequest;
import kafka.restore.messages.KafkaValidateLogRangeRequest;
import kafka.tier.store.S3VersionInformation;
import kafka.tier.store.VersionInformation;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.codehaus.plexus.util.LineOrientedInterpolatingReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/schedulers/SchedulerUtil.class */
public class SchedulerUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SchedulerUtil.class);

    public static HttpPost buildKafkaHttpRequest(KafkaRequest kafkaRequest) throws UnsupportedEncodingException, URISyntaxException {
        String str;
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.TOPIC_PARAM_NAME, kafkaRequest.getTopic());
        hashMap.put("partition", String.valueOf(kafkaRequest.getPartition()));
        if (kafkaRequest instanceof KafkaFenceRequest) {
            str = Constants.FENCE_EVENT_URI_PATH;
        } else if (kafkaRequest instanceof KafkaForceRestoreRequest) {
            KafkaForceRestoreRequest kafkaForceRestoreRequest = (KafkaForceRestoreRequest) kafkaRequest;
            hashMap.put(Constants.START_OFFSET_PARAM_NAME, String.valueOf(kafkaForceRestoreRequest.getLogStartOffset()));
            hashMap.put(Constants.END_OFFSET_PARAM_NAME, String.valueOf(kafkaForceRestoreRequest.getLogEndOffset()));
            hashMap.put(Constants.CONTENT_HASH_PARAM_NAME, kafkaForceRestoreRequest.getContentHash());
            str = Constants.FORCE_RESTORE_EVENT_URI_PATH;
        } else if (kafkaRequest instanceof KafkaUnfreezeRequest) {
            str = Constants.UNFREEZE_EVENT_URI_PATH;
        } else if (kafkaRequest instanceof KafkaValidateLogRangeRequest) {
            KafkaValidateLogRangeRequest kafkaValidateLogRangeRequest = (KafkaValidateLogRangeRequest) kafkaRequest;
            hashMap.put(Constants.START_OFFSET_PARAM_NAME, String.valueOf(kafkaValidateLogRangeRequest.getLogStartOffset()));
            hashMap.put(Constants.END_OFFSET_PARAM_NAME, String.valueOf(kafkaValidateLogRangeRequest.getLogEndOffset()));
            str = Constants.VALIDATE_LOG_RANGE_URI_PATH;
        } else {
            if (!(kafkaRequest instanceof KafkaFetchFtpsRequest)) {
                throw new IllegalArgumentException("KafkaRequest type not support: " + kafkaRequest.getClass().getName());
            }
            str = Constants.FETCH_FTPS_URI_PATH;
        }
        return buildKafkaHttpRequest(kafkaRequest.getBroker(), str, hashMap);
    }

    public static HttpPost buildKafkaHttpRequest(NodeConfig nodeConfig, String str, Map<String, String> map) throws URISyntaxException, UnsupportedEncodingException {
        URI build = new URIBuilder(getBrokerUrl(nodeConfig) + str).build();
        HttpPost httpPost = new HttpPost(build);
        StringBuilder sb = new StringBuilder();
        if (map.size() > 0) {
            sb.append("{");
            map.forEach((str2, str3) -> {
                sb.append("\"").append(str2).append("\": \"").append(str3).append("\",");
            });
            sb.deleteCharAt(sb.length() - 1);
            sb.append(LineOrientedInterpolatingReader.DEFAULT_END_DELIM);
        }
        LOGGER.debug("build http request with uri: " + build.toString() + ", body: " + sb.toString());
        StringEntity stringEntity = new StringEntity(sb.toString());
        httpPost.addHeader(KafkaHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
        httpPost.setEntity(stringEntity);
        return httpPost;
    }

    public static String getBrokerUrl(NodeConfig nodeConfig) {
        return "http://" + nodeConfig.getHost() + QualifiedSubject.CONTEXT_DELIMITER + RestoreConfig.kafkaInternalRestServerPort;
    }

    public static int getOptimalPoolSizeForKafka(double d) {
        return (int) Math.max(1L, Math.round(d * 101.0d));
    }

    public static CloseableHttpClient buildHttpClient(PoolingHttpClientConnectionManager poolingHttpClientConnectionManager) {
        return HttpClients.custom().setConnectionManager(poolingHttpClientConnectionManager).addInterceptorLast((httpResponse, httpContext) -> {
            int statusCode = httpResponse.getStatusLine().getStatusCode();
            if (statusCode != 200) {
                throw new IOException("Received non-successful status code " + statusCode + "in response");
            }
        }).setRetryHandler((iOException, i, httpContext2) -> {
            if (i >= 3) {
                return false;
            }
            try {
                Thread.sleep((long) (Math.pow(10.0d, i) * Constants.INITIAL_BACKOFF_MS));
                return true;
            } catch (InterruptedException e) {
                LOGGER.error("KafkaConnectionPool was interrupted while waiting to retry HTTP request. Giving up on retrying request.", (Throwable) e);
                return false;
            }
        }).build();
    }

    public static void saveHttpResponseToFile(HttpResponse httpResponse, String str) throws IOException {
        InputStream content = httpResponse.getEntity().getContent();
        File file = new File(str);
        Files.deleteIfExists(file.toPath());
        file.createNewFile();
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        while (true) {
            int read = content.read();
            if (read == -1) {
                content.close();
                fileOutputStream.close();
                return;
            }
            fileOutputStream.write(read);
        }
    }

    public static synchronized String getThreadPoolExecutorStatus(String str, ThreadPoolExecutor threadPoolExecutor) {
        return String.format("[%s] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s, QueueSize: %s", str, Integer.valueOf(threadPoolExecutor.getPoolSize()), Integer.valueOf(threadPoolExecutor.getCorePoolSize()), Integer.valueOf(threadPoolExecutor.getActiveCount()), Long.valueOf(threadPoolExecutor.getCompletedTaskCount()), Long.valueOf(threadPoolExecutor.getTaskCount()), Boolean.valueOf(threadPoolExecutor.isShutdown()), Boolean.valueOf(threadPoolExecutor.isTerminated()), Integer.valueOf(threadPoolExecutor.getQueue().size()));
    }

    public static String getLastLiveVersionId(String str, List<VersionInformation> list) {
        String str2 = "no version found from object store for " + str;
        if (list == null) {
            LOGGER.warn(str2);
            return null;
        }
        Iterator<VersionInformation> it = list.iterator();
        while (it.hasNext()) {
            VersionInformation next = it.next();
            if ((next instanceof S3VersionInformation) && ((S3VersionInformation) next).isDeleteMarker()) {
            }
            return next.getVersionId();
        }
        LOGGER.warn(str2);
        return null;
    }
}
