package io.confluent.ksql.api.server;

import io.confluent.ksql.api.impl.VertxCompletableFuture;
import io.confluent.ksql.api.server.protocol.QueryResponseMetadata;
import io.confluent.ksql.api.server.protocol.QueryStreamArgs;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.util.KsqlStatementException;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/QueryStreamHandler.class */
public class QueryStreamHandler implements Handler<RoutingContext> {
    private static final Logger log = LoggerFactory.getLogger(QueryStreamHandler.class);
    static final String DELIMITED_CONTENT_TYPE = "application/vnd.ksqlapi.delimited.v1";
    private final Endpoints endpoints;
    private final ConnectionQueryManager connectionQueryManager;
    private final Context context;
    private final WorkerExecutor workerExecutor;

    public QueryStreamHandler(Endpoints endpoints, ConnectionQueryManager connectionQueryManager, Context context, WorkerExecutor workerExecutor) {
        this.endpoints = (Endpoints) Objects.requireNonNull(endpoints);
        this.connectionQueryManager = (ConnectionQueryManager) Objects.requireNonNull(connectionQueryManager);
        this.context = (Context) Objects.requireNonNull(context);
        this.workerExecutor = (WorkerExecutor) Objects.requireNonNull(workerExecutor);
    }

    public void handle(RoutingContext routingContext) {
        String acceptableContentType = routingContext.getAcceptableContentType();
        QueryStreamResponseWriter delimitedQueryStreamResponseWriter = (DELIMITED_CONTENT_TYPE.equals(acceptableContentType) || acceptableContentType == null) ? new DelimitedQueryStreamResponseWriter(routingContext.response()) : new JsonQueryStreamResponseWriter(routingContext.response());
        Optional deserialiseObject = ServerUtils.deserialiseObject(routingContext.getBody(), routingContext.response(), QueryStreamArgs.class);
        if (deserialiseObject.isPresent()) {
            QueryStreamResponseWriter queryStreamResponseWriter = delimitedQueryStreamResponseWriter;
            createQueryPublisherAsync(((QueryStreamArgs) deserialiseObject.get()).sql, ((QueryStreamArgs) deserialiseObject.get()).properties, this.context).thenAccept(queryPublisher -> {
                QuerySubscriber querySubscriber = new QuerySubscriber(this.context, routingContext.response(), queryStreamResponseWriter);
                PushQueryHolder createApiQuery = this.connectionQueryManager.createApiQuery(querySubscriber, queryPublisher, routingContext.request());
                queryStreamResponseWriter.writeMetadata(new QueryResponseMetadata(createApiQuery.getId().toString(), queryPublisher.getColumnNames(), queryPublisher.getColumnTypes()));
                queryPublisher.subscribe(querySubscriber);
                routingContext.response().endHandler(r3 -> {
                    createApiQuery.close();
                });
            }).exceptionally(th -> {
                return handleQueryPublisherException(th, routingContext);
            });
        }
    }

    private Void handleQueryPublisherException(Throwable th, RoutingContext routingContext) {
        log.error("Failed to execute query", th);
        if (th instanceof CompletionException) {
            Throwable cause = th.getCause();
            if (cause instanceof KsqlStatementException) {
                ServerUtils.handleError(routingContext.response(), 400, 5, cause.getMessage());
                return null;
            }
        }
        ServerUtils.handleError(routingContext.response(), 500, 6, "The server encountered an internal error when processing the query. Please consult the server logs for more information.");
        return null;
    }

    private CompletableFuture<QueryPublisher> createQueryPublisherAsync(String str, JsonObject jsonObject, Context context) {
        VertxCompletableFuture vertxCompletableFuture = new VertxCompletableFuture();
        this.workerExecutor.executeBlocking(promise -> {
            promise.complete(this.endpoints.createQueryPublisher(str, jsonObject, context, this.workerExecutor));
        }, false, vertxCompletableFuture);
        return vertxCompletableFuture;
    }
}
