package io.confluent.ksql.rest.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.client.exception.KsqlRestClientException;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatuses;
import io.confluent.ksql.rest.entity.HealthCheckResponse;
import io.confluent.ksql.rest.entity.HeartbeatResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.LagReportingResponse;
import io.confluent.ksql.rest.entity.ServerClusterId;
import io.confluent.ksql.rest.entity.ServerInfo;
import io.confluent.ksql.rest.entity.ServerMetadata;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion;
import java.io.Closeable;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/* loaded from: input_file:io/confluent/ksql/rest/client/KsqlRestClient.class */
public final class KsqlRestClient implements Closeable {
    static final String CCLOUD_CONNECT_USERNAME_HEADER = "X-Confluent-API-Key";
    static final String CCLOUD_CONNECT_PASSWORD_HEADER = "X-Confluent-API-Secret";
    private final KsqlClient client;
    private final LocalProperties localProperties;
    private final Optional<BasicCredentials> ccloudApiKey;
    private List<URI> serverAddresses;
    private final AtomicReference<String> serializedConsistencyVector = new AtomicReference<>();
    private boolean isCCloudServer = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:io/confluent/ksql/rest/client/KsqlRestClient$KsqlClientSupplier.class */
    public interface KsqlClientSupplier {
        KsqlClient get(Map<String, String> map, Optional<BasicCredentials> optional, LocalProperties localProperties);
    }

    @Deprecated
    public static KsqlRestClient create(String str, Map<String, ?> map, Map<String, String> map2, Optional<BasicCredentials> optional) {
        return create(str, map, map2, optional, Optional.empty(), (map3, optional2, localProperties) -> {
            return new KsqlClient(map3, optional2, localProperties, new HttpClientOptions(), Optional.of(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2)));
        });
    }

    public static KsqlRestClient create(String str, Map<String, ?> map, Map<String, String> map2, Optional<BasicCredentials> optional, Optional<BasicCredentials> optional2) {
        return create(str, map, map2, optional, optional2, (map3, optional3, localProperties) -> {
            return new KsqlClient(map3, optional3, localProperties, new HttpClientOptions(), Optional.of(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2)));
        });
    }

    @VisibleForTesting
    static KsqlRestClient create(String str, Map<String, ?> map, Map<String, String> map2, Optional<BasicCredentials> optional, Optional<BasicCredentials> optional2, KsqlClientSupplier ksqlClientSupplier) {
        LocalProperties localProperties = new LocalProperties(map);
        return new KsqlRestClient(ksqlClientSupplier.get(map2, optional, localProperties), str, localProperties, optional2);
    }

    private KsqlRestClient(KsqlClient ksqlClient, String str, LocalProperties localProperties, Optional<BasicCredentials> optional) {
        this.client = (KsqlClient) Objects.requireNonNull(ksqlClient, "client");
        this.serverAddresses = parseServerAddresses(str);
        this.localProperties = (LocalProperties) Objects.requireNonNull(localProperties, "localProps");
        this.ccloudApiKey = optional;
    }

    public URI getServerAddress() {
        return this.serverAddresses.get(0);
    }

    public boolean getIsCCloudServer() {
        return this.isCCloudServer;
    }

    public boolean getHasCCloudApiKey() {
        return this.ccloudApiKey.isPresent();
    }

    public void setServerAddress(String str) {
        this.serverAddresses = parseServerAddresses(str);
    }

    public void setIsCCloudServer(boolean z) {
        this.isCCloudServer = z;
    }

    public RestResponse<ServerInfo> getServerInfo() {
        return target().getServerInfo();
    }

    public RestResponse<CommandStatus> getStatus(String str) {
        return target().getStatus(str);
    }

    public RestResponse<CommandStatuses> getAllStatuses() {
        return target().getStatuses();
    }

    public RestResponse<ServerMetadata> getServerMetadata() {
        return target().getServerMetadata();
    }

    public RestResponse<ServerClusterId> getServerMetadataId() {
        return target().getServerMetadataId();
    }

    public RestResponse<HealthCheckResponse> getServerHealth() {
        return target().getServerHealth();
    }

    public CompletableFuture<RestResponse<HeartbeatResponse>> makeAsyncHeartbeatRequest(KsqlHostInfoEntity ksqlHostInfoEntity, long j) {
        return target().postAsyncHeartbeatRequest(ksqlHostInfoEntity, j);
    }

    public RestResponse<ClusterStatusResponse> makeClusterStatusRequest() {
        return target().getClusterStatus();
    }

    public CompletableFuture<RestResponse<LagReportingResponse>> makeAsyncLagReportingRequest(LagReportingMessage lagReportingMessage) {
        return target().postAsyncLagReportingRequest(lagReportingMessage);
    }

    public RestResponse<KsqlEntityList> makeKsqlRequest(String str) {
        return target().postKsqlRequest(str, Collections.emptyMap(), Optional.empty());
    }

    public RestResponse<KsqlEntityList> makeKsqlRequest(String str, Long l) {
        return target().postKsqlRequest(str, Collections.emptyMap(), Optional.ofNullable(l));
    }

    public RestResponse<KsqlEntityList> makeConnectorRequest(String str, Long l) {
        return target(true).postKsqlRequest(str, Collections.emptyMap(), Optional.ofNullable(l));
    }

    public RestResponse<CommandStatuses> makeStatusRequest() {
        return target().getStatuses();
    }

    public RestResponse<CommandStatus> makeStatusRequest(String str) {
        return target().getStatus(str);
    }

    public RestResponse<Boolean> makeIsValidRequest(String str) {
        return target().getIsValidRequest(str);
    }

    public RestResponse<StreamPublisher<StreamedRow>> makeQueryRequestStreamed(String str, Long l) {
        return makeQueryRequestStreamed(str, l, null);
    }

    public RestResponse<StreamPublisher<StreamedRow>> makeQueryRequestStreamed(String str, Long l, Map<String, ?> map) {
        return makeQueryRequestStreamed(str, l, map, Collections.emptyMap());
    }

    public RestResponse<StreamPublisher<StreamedRow>> makeQueryRequestStreamed(String str, Long l, Map<String, ?> map, Map<String, ?> map2) {
        KsqlTarget target = target();
        Map<String, ?> consistencyVector = setConsistencyVector(map2);
        if (map != null) {
            target = target.properties(map);
        }
        return target.postQueryRequestStreamed(str, consistencyVector, Optional.ofNullable(l));
    }

    @VisibleForTesting
    public CompletableFuture<RestResponse<StreamPublisher<StreamedRow>>> makeQueryRequestStreamedAsync(String str, Map<String, ?> map) {
        KsqlTarget targetHttp2 = targetHttp2();
        if (!map.isEmpty()) {
            targetHttp2 = targetHttp2.properties(map);
        }
        return targetHttp2.postQueryRequestStreamedAsync(str, map);
    }

    public RestResponse<List<StreamedRow>> makeQueryRequest(String str, Long l) {
        return makeQueryRequest(str, l, null, Collections.emptyMap());
    }

    public RestResponse<List<StreamedRow>> makeQueryRequest(String str, Long l, Map<String, ?> map, Map<String, ?> map2) {
        KsqlTarget target = target();
        if (map != null) {
            target = target.properties(map);
        }
        return target.postQueryRequest(str, setConsistencyVector(map2), Optional.ofNullable(l));
    }

    public RestResponse<List<StreamedRow>> makeQueryStreamRequestProto(String str, Map<String, Object> map) {
        return target().postQueryStreamRequestProto(str, map);
    }

    public RestResponse<StreamPublisher<String>> makePrintTopicRequest(String str, Long l) {
        return target().postPrintTopicRequest(str, Optional.ofNullable(l));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.client.close();
    }

    public Object setProperty(String str, Object obj) {
        return this.localProperties.set(str, obj);
    }

    public Object unsetProperty(String str) {
        return this.localProperties.unset(str);
    }

    public Object getProperty(String str) {
        return this.localProperties.get(str);
    }

    @VisibleForTesting
    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "should be mutable")
    public AtomicReference<String> getSerializedConsistencyVector() {
        return this.serializedConsistencyVector;
    }

    private KsqlTarget target() {
        return target(false);
    }

    private KsqlTarget target(boolean z) {
        return this.client.target(getServerAddress(), (z && this.isCCloudServer) ? ccloudConnectorHeaders() : Collections.emptyMap());
    }

    private KsqlTarget targetHttp2() {
        return this.client.targetHttp2(getServerAddress());
    }

    private Map<String, String> ccloudConnectorHeaders() {
        if (this.ccloudApiKey.isPresent()) {
            return ImmutableMap.of(CCLOUD_CONNECT_USERNAME_HEADER, this.ccloudApiKey.get().username(), CCLOUD_CONNECT_PASSWORD_HEADER, this.ccloudApiKey.get().password());
        }
        throw new IllegalStateException("Should not request headers if no credentials provided.");
    }

    private static List<URI> parseServerAddresses(String str) {
        Objects.requireNonNull(str, "serverAddress");
        return ImmutableList.copyOf((Collection) Arrays.stream(str.split(",")).map((v0) -> {
            return v0.trim();
        }).map(KsqlRestClient::parseUri).collect(Collectors.toList()));
    }

    private static URI parseUri(String str) {
        try {
            URL url = new URL(str);
            return url.getPort() == -1 ? new URL(str.concat(":") + url.getDefaultPort()).toURI() : url.toURI();
        } catch (Exception e) {
            throw new KsqlRestClientException("The supplied serverAddress is invalid: " + str, e);
        }
    }

    private Map<String, Object> setConsistencyVector(Map<String, ?> map) {
        HashMap hashMap = new HashMap();
        if (map != null) {
            hashMap.putAll(map);
        }
        return hashMap;
    }
}
