/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.impl.BatchedQueryResultImpl;
import io.confluent.ksql.api.client.impl.ExecuteQueryResponseHandler;
import io.confluent.ksql.api.client.impl.InsertsResponseHandler;
import io.confluent.ksql.api.client.impl.ResponseHandler;
import io.confluent.ksql.api.client.impl.StreamQueryResponseHandler;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.streams.ReadStream;
import java.nio.charset.Charset;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class ClientImpl
implements Client {
    private final ClientOptions clientOptions;
    private final Vertx vertx;
    private final HttpClient httpClient;
    private final SocketAddress serverSocketAddress;
    private final String basicAuthHeader;
    private final boolean ownedVertx;

    public ClientImpl(ClientOptions clientOptions) {
        this(clientOptions, Vertx.vertx(), true);
    }

    public ClientImpl(ClientOptions clientOptions, Vertx vertx) {
        this(clientOptions, vertx, false);
    }

    private ClientImpl(ClientOptions clientOptions, Vertx vertx, boolean ownedVertx) {
        this.clientOptions = clientOptions.copy();
        this.vertx = vertx;
        this.ownedVertx = ownedVertx;
        this.httpClient = ClientImpl.createHttpClient(vertx, clientOptions);
        this.basicAuthHeader = ClientImpl.createBasicAuthHeader(clientOptions);
        this.serverSocketAddress = SocketAddress.inetSocketAddress((int)clientOptions.getPort(), (String)clientOptions.getHost());
    }

    @Override
    public CompletableFuture<StreamedQueryResult> streamQuery(String sql) {
        return this.streamQuery(sql, Collections.emptyMap());
    }

    @Override
    public CompletableFuture<StreamedQueryResult> streamQuery(String sql, Map<String, Object> properties) {
        CompletableFuture<StreamedQueryResult> cf = new CompletableFuture<StreamedQueryResult>();
        this.makeQueryRequest(sql, properties, cf, StreamQueryResponseHandler::new);
        return cf;
    }

    @Override
    public BatchedQueryResult executeQuery(String sql) {
        return this.executeQuery(sql, Collections.emptyMap());
    }

    @Override
    public BatchedQueryResult executeQuery(String sql, Map<String, Object> properties) {
        BatchedQueryResultImpl result = new BatchedQueryResultImpl();
        this.makeQueryRequest(sql, properties, result, (context, recordParser, cf) -> new ExecuteQueryResponseHandler(context, recordParser, (BatchedQueryResult)cf, this.clientOptions.getExecuteQueryMaxResultRows()));
        return result;
    }

    @Override
    public CompletableFuture<Void> insertInto(String streamName, KsqlObject row) {
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        Buffer requestBody = Buffer.buffer();
        JsonObject params = new JsonObject().put("target", streamName);
        requestBody.appendBuffer(params.toBuffer()).appendString("\n");
        requestBody.appendString(row.toJsonString()).appendString("\n");
        this.makeRequest("/inserts-stream", requestBody, cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleResponse(response, cf, InsertsResponseHandler::new)));
        return cf;
    }

    @Override
    public CompletableFuture<Void> terminatePushQuery(String queryId) {
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        this.makeRequest("/close-query", new JsonObject().put("queryId", queryId), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleCloseQueryResponse(response, cf)));
        return cf;
    }

    @Override
    public void close() {
        this.httpClient.close();
        if (this.ownedVertx) {
            this.vertx.close();
        }
    }

    private <T extends CompletableFuture<?>> void makeQueryRequest(String sql, Map<String, Object> properties, T cf, ResponseHandlerSupplier<T> responseHandlerSupplier) {
        JsonObject requestBody = new JsonObject().put("sql", sql).put("properties", properties);
        this.makeRequest("/query-stream", requestBody, cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleResponse(response, cf, responseHandlerSupplier)));
    }

    private <T extends CompletableFuture<?>> void makeRequest(String path, JsonObject requestBody, T cf, Handler<HttpClientResponse> responseHandler) {
        this.makeRequest(path, requestBody.toBuffer(), cf, responseHandler);
    }

    private <T extends CompletableFuture<?>> void makeRequest(String path, Buffer requestBody, T cf, Handler<HttpClientResponse> responseHandler) {
        HttpClientRequest request = this.httpClient.request(HttpMethod.POST, this.serverSocketAddress, this.clientOptions.getPort(), this.clientOptions.getHost(), path, responseHandler).exceptionHandler(cf::completeExceptionally);
        if (this.clientOptions.isUseBasicAuth()) {
            request = this.configureBasicAuth(request);
        }
        request.end(requestBody);
    }

    private HttpClientRequest configureBasicAuth(HttpClientRequest request) {
        return request.putHeader(HttpHeaderNames.AUTHORIZATION.toString(), this.basicAuthHeader);
    }

    private static <T extends CompletableFuture<?>> void handleResponse(HttpClientResponse response, T cf, ResponseHandlerSupplier<T> responseHandlerSupplier) {
        if (response.statusCode() == HttpResponseStatus.OK.code()) {
            RecordParser recordParser = RecordParser.newDelimited((String)"\n", (ReadStream)response);
            ResponseHandler<T> responseHandler = responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf);
            recordParser.handler(responseHandler::handleBodyBuffer);
            recordParser.endHandler(responseHandler::handleBodyEnd);
            recordParser.exceptionHandler(responseHandler::handleException);
        } else {
            ClientImpl.handleErrorResponse(response, cf);
        }
    }

    private static void handleCloseQueryResponse(HttpClientResponse response, CompletableFuture<Void> cf) {
        if (response.statusCode() == HttpResponseStatus.OK.code()) {
            cf.complete(null);
        } else {
            ClientImpl.handleErrorResponse(response, cf);
        }
    }

    private static <T extends CompletableFuture<?>> void handleErrorResponse(HttpClientResponse response, T cf) {
        response.bodyHandler(buffer -> {
            JsonObject errorResponse = buffer.toJsonObject();
            cf.completeExceptionally(new KsqlClientException(String.format("Received %d response from server: %s. Error code: %d", response.statusCode(), errorResponse.getString("message"), errorResponse.getInteger("error_code"))));
        });
    }

    private static HttpClient createHttpClient(Vertx vertx, ClientOptions clientOptions) {
        HttpClientOptions options = new HttpClientOptions().setSsl(clientOptions.isUseTls()).setUseAlpn(clientOptions.isUseAlpn()).setProtocolVersion(HttpVersion.HTTP_2).setVerifyHost(clientOptions.isVerifyHost()).setDefaultHost(clientOptions.getHost()).setDefaultPort(clientOptions.getPort());
        if (clientOptions.isUseTls() && !clientOptions.getTrustStore().isEmpty()) {
            options = options.setTrustStoreOptions(new JksOptions().setPath(clientOptions.getTrustStore()).setPassword(clientOptions.getTrustStorePassword()));
        }
        if (!clientOptions.getKeyStore().isEmpty()) {
            options = options.setKeyStoreOptions(new JksOptions().setPath(clientOptions.getKeyStore()).setPassword(clientOptions.getKeyStorePassword()));
        }
        return vertx.createHttpClient(options);
    }

    private static String createBasicAuthHeader(ClientOptions clientOptions) {
        if (!clientOptions.isUseBasicAuth()) {
            return "";
        }
        String creds = clientOptions.getBasicAuthUsername() + ":" + clientOptions.getBasicAuthPassword();
        String base64creds = Base64.getEncoder().encodeToString(creds.getBytes(Charset.defaultCharset()));
        return "Basic " + base64creds;
    }

    @FunctionalInterface
    private static interface ResponseHandlerSupplier<T extends CompletableFuture<?>> {
        public ResponseHandler<T> get(Context var1, RecordParser var2, T var3);
    }
}

