/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.storage.amp;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.http.client.utils.URIBuilder;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.storage.MetricsStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.jose4j.json.internal.json_simple.JSONArray;
import org.jose4j.json.internal.json_simple.JSONObject;
import org.jose4j.json.internal.json_simple.parser.JSONParser;
import org.jose4j.json.internal.json_simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
import software.amazon.awssdk.identity.spi.Identity;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
import software.amazon.awssdk.utils.IoUtils;

@Confluent
public class AmazonManagedPrometheusMetricsStore
implements MetricsStore {
    private static final Logger log = LoggerFactory.getLogger(AmazonManagedPrometheusMetricsStore.class);
    private static final String ENDPOINT = "https://aps-workspaces.us-west-2.amazonaws.com";
    private static final String SERVICE_SIGNING_NAME = "aps";
    private static final String REGION_NAME = "us-west-2";
    private static final String AMP_SESSION_NAME = "connect-amp-session";
    private static final String CONNECT_WORKER_IP_METRIC = "k8s_pod_ip";
    private final String workerHeapUsageMetric;
    private final String workerCPULoadMetric;
    private final String taskCPULoadMetric;
    private final String workspaceId;
    private final String connectCluster;
    private final AwsV4HttpSigner signer;
    private final String arn;
    String credentialsFilePath;
    private SdkHttpClient httpClient;
    StsClient stsClient;
    private AwsSessionCredentials sessionCredentials;
    private Instant credentialsExpiration;
    private AwsCredentials baseCredentials;
    static final String AMP_SECRET_KEY_ID = "access_key";
    static final String AMP_SECRET_ACCESS_KEY = "secret_key";

    public AmazonManagedPrometheusMetricsStore(DistributedConfig config) {
        this.workspaceId = config.getString("confluent.connect.amp.metrics.store.workspace.id");
        String groupId = (String)config.originals().get("group.id");
        this.connectCluster = groupId.split("\\.")[0];
        this.signer = AwsV4HttpSigner.create();
        this.arn = config.getString("confluent.connect.amp.reader.arn");
        this.workerCPULoadMetric = config.getString("confluent.connect.worker.cpu.load.metric.name");
        this.workerHeapUsageMetric = config.getString("confluent.connect.worker.memory.load.metric.name");
        this.taskCPULoadMetric = config.getString("confluent.connect.task.cpu.load.metric.name");
        this.credentialsFilePath = config.getString("confluent.connect.amp.credentials.file.path");
    }

    @Override
    public void start() throws Exception {
        try {
            this.validate();
            this.httpClient = ApacheHttpClient.create();
            if (Utils.isBlank((String)this.credentialsFilePath)) {
                this.stsClient = (StsClient)((StsClientBuilder)((StsClientBuilder)StsClient.builder().credentialsProvider((AwsCredentialsProvider)DefaultCredentialsProvider.create())).region(Region.US_WEST_2)).build();
            } else {
                this.baseCredentials = this.loadCredentialsFromFile(this.credentialsFilePath);
            }
        }
        catch (Exception e) {
            this.cleanupResources();
            throw e;
        }
    }

    private void cleanupResources() {
        try {
            if (this.httpClient != null) {
                this.httpClient.close();
                this.httpClient = null;
            }
        }
        catch (Exception ex) {
            log.warn("Error closing HTTP client during cleanup", (Throwable)ex);
        }
        try {
            if (this.stsClient != null) {
                this.stsClient.close();
                this.stsClient = null;
            }
        }
        catch (Exception ex) {
            log.warn("Error closing STS client during cleanup", (Throwable)ex);
        }
    }

    @Override
    public void validate() throws IllegalArgumentException {
        File file;
        if (Utils.isBlank((String)this.workspaceId)) {
            throw new IllegalArgumentException("Workspace ID is required.");
        }
        if (Utils.isBlank((String)this.connectCluster)) {
            throw new IllegalArgumentException("Connect cluster is required.");
        }
        if (Utils.isBlank((String)this.arn)) {
            throw new IllegalArgumentException("ARN is required.");
        }
        if (Utils.isBlank((String)this.workerCPULoadMetric)) {
            throw new IllegalArgumentException("Worker CPU load metric is required.");
        }
        if (Utils.isBlank((String)this.workerHeapUsageMetric)) {
            throw new IllegalArgumentException("Worker heap usage metric is required.");
        }
        if (Utils.isBlank((String)this.taskCPULoadMetric)) {
            throw new IllegalArgumentException("Task CPU load metric is required.");
        }
        if (!Utils.isBlank((String)this.credentialsFilePath) && !(file = new File(this.credentialsFilePath)).exists()) {
            throw new IllegalArgumentException("Credentials file not found at: " + this.credentialsFilePath);
        }
    }

    @Override
    public void stop() throws Exception {
        this.cleanupResources();
        log.info("PrometheusMetricsStore stopped.");
    }

    @Override
    public Map<String, Double> getWorkersMemoryLoad() {
        try {
            String labels = String.format("k8s_namespace_name=\"%s\", type=\"heap\"", this.connectCluster);
            String query = this.buildWorkerPrometheusQuery(this.workerHeapUsageMetric, labels, false);
            String responseBody = this.executePrometheusQuery(query);
            return this.parseWorkerLoadResponse(responseBody);
        }
        catch (Exception e) {
            log.error("Error fetching workers memory load {}", (Throwable)e);
            return new HashMap<String, Double>();
        }
    }

    @Override
    public Map<String, Double> getWorkersCPULoad() {
        try {
            String labels = String.format("k8s_namespace_name=\"%s\"", this.connectCluster);
            String query = this.buildWorkerPrometheusQuery(this.workerCPULoadMetric, labels, true);
            String responseBody = this.executePrometheusQuery(query);
            return this.parseWorkerLoadResponse(responseBody);
        }
        catch (Exception e) {
            log.error("Error fetching workers cpu load {} ", (Throwable)e);
            return new HashMap<String, Double>();
        }
    }

    String buildWorkerPrometheusQuery(String metricName, String labels, boolean includeMultiplier) {
        String queryTemplate = String.format("sum by(%s) (avg_over_time(%%s{%%s}[5m]))", CONNECT_WORKER_IP_METRIC);
        Object query = String.format(queryTemplate, metricName, labels);
        if (includeMultiplier) {
            query = (String)query + "*100";
        }
        return query;
    }

    @Override
    public Map<String, Map<ConnectorTaskId, Double>> getTasksLoad() {
        try {
            String query = String.format("avg by (%s, connector, task) (avg_over_time(%s{k8s_namespace_name=\"%s\"}[5m]))", CONNECT_WORKER_IP_METRIC, this.taskCPULoadMetric, this.connectCluster);
            String responseBody = this.executePrometheusQuery(query);
            return this.parseTaskLoadResponse(responseBody);
        }
        catch (Exception e) {
            log.error("Error fetching task load: ", (Throwable)e);
            return new HashMap<String, Map<ConnectorTaskId, Double>>();
        }
    }

    private AwsSessionCredentials getSessionCredentials() {
        if (this.sessionCredentials == null || this.credentialsExpired()) {
            if (this.sessionCredentials == null) {
                log.info("Fetching session credentials for the first time.");
            } else {
                log.info("Session credentials expired. Fetching new session credentials.");
            }
            AssumeRoleRequest roleRequest = (AssumeRoleRequest)AssumeRoleRequest.builder().roleArn(this.arn).roleSessionName(AMP_SESSION_NAME).build();
            AssumeRoleResponse roleResponse = this.stsClient.assumeRole(roleRequest);
            this.sessionCredentials = AwsSessionCredentials.create((String)roleResponse.credentials().accessKeyId(), (String)roleResponse.credentials().secretAccessKey(), (String)roleResponse.credentials().sessionToken());
            this.credentialsExpiration = roleResponse.credentials().expiration();
        }
        return this.sessionCredentials;
    }

    private boolean credentialsExpired() {
        return this.credentialsExpiration == null || Instant.now().isAfter(this.credentialsExpiration);
    }

    String executePrometheusQuery(String query) throws URISyntaxException {
        log.debug("Executing prometheus query: {}", (Object)query);
        URI uri = new URIBuilder(new URI("https://aps-workspaces.us-west-2.amazonaws.com/workspaces/" + this.workspaceId + "/api/v1/query")).addParameter("query", query).build();
        Object credentialsToUse = this.stsClient != null ? this.getSessionCredentials() : this.baseCredentials;
        SdkHttpFullRequest request = SdkHttpFullRequest.builder().method(SdkHttpMethod.GET).uri(uri).build();
        SignedRequest signedRequest = this.signer.sign(arg_0 -> AmazonManagedPrometheusMetricsStore.lambda$executePrometheusQuery$0((AwsCredentials)credentialsToUse, request, arg_0));
        try {
            HttpExecuteRequest httpExecuteRequest = HttpExecuteRequest.builder().request(signedRequest.request()).contentStreamProvider((ContentStreamProvider)signedRequest.payload().orElse(null)).build();
            HttpExecuteResponse response = this.httpClient.prepareRequest(httpExecuteRequest).call();
            if (response.httpResponse().statusCode() != 200) {
                log.error("AMP query  {} request failed with status code {}", (Object)query, (Object)response.httpResponse().statusCode());
                throw new RuntimeException("Request failed with status code: " + response.httpResponse().toString());
            }
            String responseBody = IoUtils.toUtf8String((InputStream)((InputStream)response.responseBody().orElseThrow(() -> new RuntimeException("No Response Body found"))));
            return responseBody;
        }
        catch (Exception e) {
            log.error("Error executing prometheus query: ", (Throwable)e);
            return null;
        }
    }

    private Map<String, Double> parseWorkerLoadResponse(String responseBody) throws ParseException {
        if (Utils.isBlank((String)responseBody)) {
            return new HashMap<String, Double>();
        }
        JSONParser parser = new JSONParser();
        log.debug("Parsing worker load response: {}", (Object)responseBody);
        JSONObject jsonResponse = (JSONObject)parser.parse(responseBody);
        JSONObject data = (JSONObject)jsonResponse.get((Object)"data");
        JSONArray results = (JSONArray)data.get((Object)"result");
        HashMap<String, Double> metricsMap = new HashMap<String, Double>();
        for (Object resultObject : results) {
            JSONObject result = (JSONObject)resultObject;
            JSONObject metric = (JSONObject)result.get((Object)"metric");
            String podName = "connect-" + String.valueOf(metric.get((Object)CONNECT_WORKER_IP_METRIC));
            JSONArray valueArray = (JSONArray)result.get((Object)"value");
            double rawValue = Double.parseDouble((String)valueArray.get(1));
            double value = (double)Math.round(rawValue * 100.0) / 100.0;
            metricsMap.put(podName, value);
        }
        return metricsMap;
    }

    private Map<String, Map<ConnectorTaskId, Double>> parseTaskLoadResponse(String responseBody) throws ParseException {
        if (Utils.isBlank((String)responseBody)) {
            return new HashMap<String, Map<ConnectorTaskId, Double>>();
        }
        log.debug("Parsing task load response: {}", (Object)responseBody);
        JSONParser parser = new JSONParser();
        JSONObject jsonResponse = (JSONObject)parser.parse(responseBody);
        JSONObject data = (JSONObject)jsonResponse.get((Object)"data");
        JSONArray results = (JSONArray)data.get((Object)"result");
        HashMap<String, Map<ConnectorTaskId, Double>> podTaskLoadMap = new HashMap<String, Map<ConnectorTaskId, Double>>();
        for (Object resultObject : results) {
            JSONObject result = (JSONObject)resultObject;
            JSONObject metric = (JSONObject)result.get((Object)"metric");
            String podName = "connect-" + String.valueOf(metric.get((Object)CONNECT_WORKER_IP_METRIC));
            ConnectorTaskId taskId = new ConnectorTaskId(metric.get((Object)"connector").toString(), Integer.parseInt(metric.get((Object)"task").toString()));
            double value = Double.parseDouble((String)((JSONArray)result.get((Object)"value")).get(1));
            podTaskLoadMap.computeIfAbsent(podName, k -> new HashMap());
            ((Map)podTaskLoadMap.get(podName)).put(taskId, value);
        }
        return podTaskLoadMap;
    }

    AwsCredentials loadCredentialsFromFile(String filePath) throws IOException {
        Properties props = new Properties();
        File credentialsFile = new File(filePath);
        try (FileInputStream fis = new FileInputStream(credentialsFile);){
            props.load(fis);
        }
        String accessKeyId = props.getProperty(AMP_SECRET_KEY_ID);
        String secretAccessKey = props.getProperty(AMP_SECRET_ACCESS_KEY);
        if (Utils.isBlank((String)accessKeyId) || Utils.isBlank((String)secretAccessKey)) {
            throw new IllegalArgumentException("Missing 'access_key' or 'secret_key' in credentials file: " + filePath);
        }
        log.info("Using AMP credentials from file: {}", (Object)filePath);
        return AwsBasicCredentials.create((String)accessKeyId, (String)secretAccessKey);
    }

    private static /* synthetic */ void lambda$executePrometheusQuery$0(AwsCredentials credentialsToUse, SdkHttpFullRequest request, SignRequest.Builder r) {
        ((SignRequest.Builder)((SignRequest.Builder)((SignRequest.Builder)r.identity((Identity)credentialsToUse)).request((SdkHttpRequest)request)).putProperty(AwsV4HttpSigner.SERVICE_SIGNING_NAME, (Object)SERVICE_SIGNING_NAME)).putProperty(AwsV4HttpSigner.REGION_NAME, (Object)REGION_NAME);
    }
}

