package io.confluent.ksql.rest.server.services;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.client.KsqlTarget;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlHostInfo;
import io.vertx.core.Vertx;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.streams.WriteStream;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/ksql/rest/server/services/DefaultKsqlClient.class */
public final class DefaultKsqlClient implements SimpleKsqlClient {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultKsqlClient.class);
    private final Optional<String> authHeader;
    private final KsqlClient sharedClient;
    private final boolean ownSharedClient;
    private final KsqlConfig ksqlConfig;

    @VisibleForTesting
    DefaultKsqlClient(Optional<String> optional, Map<String, Object> map, BiFunction<Integer, String, SocketAddress> biFunction) {
        this(optional, InternalKsqlClientFactory.createInternalClient(toClientProps(map), biFunction, Vertx.vertx()), true, new KsqlConfig(map));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultKsqlClient(Optional<String> optional, KsqlClient ksqlClient, KsqlConfig ksqlConfig) {
        this(optional, ksqlClient, false, ksqlConfig);
    }

    DefaultKsqlClient(Optional<String> optional, KsqlClient ksqlClient, boolean z, KsqlConfig ksqlConfig) {
        this.authHeader = (Optional) Objects.requireNonNull(optional, "authHeader");
        this.sharedClient = (KsqlClient) Objects.requireNonNull(ksqlClient, "sharedClient");
        this.ownSharedClient = z;
        this.ksqlConfig = ksqlConfig;
    }

    public RestResponse<KsqlEntityList> makeKsqlRequest(URI uri, String str, Map<String, ?> map) {
        return getTarget(this.sharedClient.target(uri)).postKsqlRequest(str, map, Optional.empty());
    }

    public RestResponse<List<StreamedRow>> makeQueryRequest(URI uri, String str, Map<String, ?> map, Map<String, ?> map2) {
        RestResponse postQueryRequest = getTarget(this.sharedClient.target(uri).properties(map).timeout(getQueryTimeout(map))).postQueryRequest(str, map2, Optional.empty());
        return postQueryRequest.isErroneous() ? RestResponse.erroneous(postQueryRequest.getStatusCode(), postQueryRequest.getErrorMessage()) : RestResponse.successful(postQueryRequest.getStatusCode(), postQueryRequest.getResponse());
    }

    public RestResponse<Integer> makeQueryRequest(URI uri, String str, Map<String, ?> map, Map<String, ?> map2, WriteStream<List<StreamedRow>> writeStream, CompletableFuture<Void> completableFuture, Function<StreamedRow, StreamedRow> function) {
        RestResponse postQueryRequest = getTarget(this.sharedClient.target(uri).properties(map).timeout(getQueryTimeout(map))).postQueryRequest(str, map2, Optional.empty(), writeStream, completableFuture, function);
        return postQueryRequest.isErroneous() ? RestResponse.erroneous(postQueryRequest.getStatusCode(), postQueryRequest.getErrorMessage()) : RestResponse.successful(postQueryRequest.getStatusCode(), postQueryRequest.getResponse());
    }

    public CompletableFuture<RestResponse<BufferedPublisher<StreamedRow>>> makeQueryRequestStreamed(URI uri, String str, Map<String, ?> map, Map<String, ?> map2) {
        return getTarget(this.sharedClient.targetHttp2(uri).properties(map)).postQueryRequestStreamedAsync(str, map2).thenApply(restResponse -> {
            return restResponse.isErroneous() ? RestResponse.erroneous(restResponse.getStatusCode(), restResponse.getErrorMessage()) : RestResponse.successful(restResponse.getStatusCode(), restResponse.getResponse());
        });
    }

    public void makeAsyncHeartbeatRequest(URI uri, KsqlHostInfo ksqlHostInfo, long j) {
        getTarget(this.sharedClient.target(uri)).postAsyncHeartbeatRequest(new KsqlHostInfoEntity(ksqlHostInfo.host(), ksqlHostInfo.port()), j).exceptionally(th -> {
            LOG.debug("Exception in async heartbeat request", th);
            return null;
        });
    }

    public RestResponse<ClusterStatusResponse> makeClusterStatusRequest(URI uri) {
        return getTarget(this.sharedClient.target(uri)).getClusterStatus();
    }

    public void makeAsyncLagReportRequest(URI uri, LagReportingMessage lagReportingMessage) {
        getTarget(this.sharedClient.target(uri)).postAsyncLagReportingRequest(lagReportingMessage).exceptionally(th -> {
            LOG.debug("Exception in async lag reporting request", th);
            return null;
        });
    }

    public void close() {
        if (this.ownSharedClient) {
            this.sharedClient.close();
        }
    }

    private KsqlTarget getTarget(KsqlTarget ksqlTarget) {
        Optional<String> optional = this.authHeader;
        ksqlTarget.getClass();
        return (KsqlTarget) optional.map(ksqlTarget::authorizationHeader).orElse(ksqlTarget);
    }

    private static Map<String, String> toClientProps(Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().toString());
        }
        return hashMap;
    }

    private long getQueryTimeout(Map<String, ?> map) {
        return map.containsKey("ksql.query.pull.forwarding.timeout.ms") ? ((Long) map.get("ksql.query.pull.forwarding.timeout.ms")).longValue() : this.ksqlConfig.getLong("ksql.query.pull.forwarding.timeout.ms").longValue();
    }
}
