package io.confluent.ksql.services;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.services.ConnectClient;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.vertx.core.http.HttpHeaders;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.hc.client5.http.fluent.Request;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.util.Timeout;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/services/DefaultConnectClient.class */
public class DefaultConnectClient implements ConnectClient {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultConnectClient.class);
    private static final ObjectMapper MAPPER = ConnectJsonMapper.INSTANCE.get();
    private static final String CONNECTORS = "/connectors";
    private static final String STATUS = "/status";
    private static final String TOPICS = "/topics";
    private static final int DEFAULT_TIMEOUT_MS = 5000;
    private static final int MAX_ATTEMPTS = 3;
    private final URI connectUri;
    private final Optional<String> authHeader;

    public DefaultConnectClient(String str, Optional<String> optional) {
        Objects.requireNonNull(str, "connectUri");
        this.authHeader = (Optional) Objects.requireNonNull(optional, "authHeader");
        try {
            this.connectUri = new URI(str);
        } catch (URISyntaxException e) {
            throw new KsqlException("Could not initialize connect client due to invalid URI: " + str, e);
        }
    }

    public ConnectClient.ConnectResponse<ConnectorInfo> create(String str, Map<String, String> map) {
        try {
            LOG.debug("Issuing create request to Kafka Connect at URI {} with name {} and config {}", new Object[]{this.connectUri, str, map});
            ConnectClient.ConnectResponse<ConnectorInfo> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.post(this.connectUri.resolve(CONNECTORS)).setHeaders(headers()).responseTimeout(Timeout.ofMilliseconds(5000L)).connectTimeout(Timeout.ofMilliseconds(5000L)).bodyString(MAPPER.writeValueAsString(ImmutableMap.of("name", str, "config", map)), ContentType.APPLICATION_JSON).execute().handleResponse(createHandler(201, new TypeReference<ConnectorInfo>() { // from class: io.confluent.ksql.services.DefaultConnectClient.1
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Did not CREATE connector {}: {}", str, str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<List<String>> connectors() {
        try {
            LOG.debug("Issuing request to Kafka Connect at URI {} to list connectors", this.connectUri);
            ConnectClient.ConnectResponse<List<String>> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.get(this.connectUri.resolve(CONNECTORS)).setHeaders(headers()).responseTimeout(Timeout.ofMilliseconds(5000L)).connectTimeout(Timeout.ofMilliseconds(5000L)).execute().handleResponse(createHandler(200, new TypeReference<List<String>>() { // from class: io.confluent.ksql.services.DefaultConnectClient.2
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str -> {
                LOG.warn("Could not list connectors: {}.", str);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<ConnectorStateInfo> status(String str) {
        try {
            LOG.debug("Issuing status request to Kafka Connect at URI {} with name {}", this.connectUri, str);
            ConnectClient.ConnectResponse<ConnectorStateInfo> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.get(this.connectUri.resolve("/connectors/" + str + STATUS)).setHeaders(headers()).responseTimeout(Timeout.ofMilliseconds(5000L)).connectTimeout(Timeout.ofMilliseconds(5000L)).execute().handleResponse(createHandler(200, new TypeReference<ConnectorStateInfo>() { // from class: io.confluent.ksql.services.DefaultConnectClient.3
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Could not query status of connector {}: {}", str, str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<ConnectorInfo> describe(String str) {
        try {
            LOG.debug("Issuing request to Kafka Connect at URI {} to get config for {}", this.connectUri, str);
            ConnectClient.ConnectResponse<ConnectorInfo> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.get(this.connectUri.resolve(String.format("%s/%s", CONNECTORS, str))).setHeaders(headers()).responseTimeout(Timeout.ofMilliseconds(5000L)).connectTimeout(Timeout.ofMilliseconds(5000L)).execute().handleResponse(createHandler(200, new TypeReference<ConnectorInfo>() { // from class: io.confluent.ksql.services.DefaultConnectClient.4
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Could not list connectors: {}.", str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<String> delete(String str) {
        try {
            LOG.debug("Issuing request to Kafka Connect at URI {} to delete {}", this.connectUri, str);
            ConnectClient.ConnectResponse<String> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.delete(this.connectUri.resolve(String.format("%s/%s", CONNECTORS, str))).setHeaders(headers()).responseTimeout(Timeout.ofMilliseconds(5000L)).connectTimeout(Timeout.ofMilliseconds(5000L)).execute().handleResponse(createHandler(204, new TypeReference<Object>() { // from class: io.confluent.ksql.services.DefaultConnectClient.5
                }, obj -> {
                    return str;
                }));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Could not delete connector: {}.", str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<Map<String, Map<String, List<String>>>> topics(String str) {
        try {
            LOG.debug("Issuing request to Kafka Connect at URI {} to get active topics for {}", this.connectUri, str);
            ConnectClient.ConnectResponse<Map<String, Map<String, List<String>>>> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.get(this.connectUri.resolve("/connectors/" + str + TOPICS)).setHeaders(headers()).responseTimeout(Timeout.ofMilliseconds(5000L)).connectTimeout(Timeout.ofMilliseconds(5000L)).execute().handleResponse(createHandler(200, new TypeReference<Map<String, Map<String, List<String>>>>() { // from class: io.confluent.ksql.services.DefaultConnectClient.6
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Could not query topics of connector {}: {}", str, str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    private Header[] headers() {
        return this.authHeader.isPresent() ? new Header[]{new BasicHeader(HttpHeaders.AUTHORIZATION.toString(), this.authHeader.get())} : new Header[0];
    }

    private static <T> ConnectClient.ConnectResponse<T> withRetries(Callable<ConnectClient.ConnectResponse<T>> callable) {
        try {
            return (ConnectClient.ConnectResponse) RetryerBuilder.newBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(MAX_ATTEMPTS)).withWaitStrategy(WaitStrategies.exponentialWait()).retryIfResult(connectResponse -> {
                return connectResponse == null || connectResponse.httpCode() >= 500 || connectResponse.httpCode() == 409;
            }).retryIfException().build().call(callable);
        } catch (RetryException e) {
            LOG.warn("Failed to query connect cluster after {} attempts.", Integer.valueOf(e.getNumberOfFailedAttempts()));
            if (e.getLastFailedAttempt().hasResult()) {
                return (ConnectClient.ConnectResponse) e.getLastFailedAttempt().getResult();
            }
            throw new KsqlServerException(e.getCause());
        } catch (ExecutionException e2) {
            throw new KsqlServerException("Unexpected exception!", e2);
        }
    }

    private static <T, C> HttpClientResponseHandler<ConnectClient.ConnectResponse<T>> createHandler(int i, TypeReference<C> typeReference, Function<C, T> function) {
        return classicHttpResponse -> {
            int code = classicHttpResponse.getCode();
            if (classicHttpResponse.getCode() != i) {
                return ConnectClient.ConnectResponse.failure(EntityUtils.toString(classicHttpResponse.getEntity()), code);
            }
            HttpEntity entity = classicHttpResponse.getEntity();
            return ConnectClient.ConnectResponse.success(function.apply(entity == null ? null : MAPPER.readValue(entity.getContent(), typeReference)), code);
        };
    }
}
