package io.confluent.ksql.api.server;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.api.auth.DefaultApiSecurityContext;
import io.confluent.ksql.api.server.JsonStreamedRowResponseWriter;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.api.util.ApiServerUtils;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlMediaType;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.confluent.ksql.rest.entity.QueryStreamArgs;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.HttpVersion;
import io.vertx.ext.web.RoutingContext;
import java.time.Clock;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/QueryStreamHandler.class */
public class QueryStreamHandler implements Handler<RoutingContext> {
    private static final Logger log = LoggerFactory.getLogger(QueryStreamHandler.class);
    static final String DELIMITED_CONTENT_TYPE = "application/vnd.ksqlapi.delimited.v1";
    static final String JSON_CONTENT_TYPE = "application/json";
    private final Endpoints endpoints;
    private final ConnectionQueryManager connectionQueryManager;
    private final Context context;
    private final Server server;
    private final boolean queryCompatibilityMode;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/api/server/QueryStreamHandler$CommonRequest.class */
    public static class CommonRequest {
        final String sql;
        final Map<String, Object> configOverrides;
        final Map<String, Object> sessionProperties;
        final Map<String, Object> requestProperties;

        CommonRequest(String str, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3) {
            this.sql = str;
            this.configOverrides = map;
            this.sessionProperties = map2;
            this.requestProperties = map3;
        }
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public QueryStreamHandler(Endpoints endpoints, ConnectionQueryManager connectionQueryManager, Context context, Server server, boolean z) {
        this.endpoints = (Endpoints) Objects.requireNonNull(endpoints);
        this.connectionQueryManager = (ConnectionQueryManager) Objects.requireNonNull(connectionQueryManager);
        this.context = (Context) Objects.requireNonNull(context);
        this.server = (Server) Objects.requireNonNull(server);
        this.queryCompatibilityMode = z;
    }

    public void handle(RoutingContext routingContext) {
        if (routingContext.request().version() == HttpVersion.HTTP_1_1) {
            routingContext.response().putHeader("Transfer-Encoding", "chunked");
        } else if (routingContext.request().version() != HttpVersion.HTTP_2) {
            routingContext.fail(HttpResponseStatus.BAD_REQUEST.code(), new KsqlApiException("This endpoint is only available when using HTTP1.1 or HTTP2", Errors.ERROR_CODE_BAD_REQUEST));
        }
        CommonRequest request = getRequest(routingContext);
        if (request == null) {
            return;
        }
        Optional<Boolean> isInternalRequest = ServerVerticle.isInternalRequest(routingContext);
        MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder();
        long nanoseconds = Time.SYSTEM.nanoseconds();
        this.endpoints.createQueryPublisher(request.sql, request.configOverrides, request.sessionProperties, request.requestProperties, this.context, this.server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext, this.server), metricsCallbackHolder, isInternalRequest).thenAccept(queryPublisher -> {
            handleQueryPublisher(routingContext, queryPublisher, metricsCallbackHolder, nanoseconds);
        }).exceptionally(th -> {
            return ServerUtils.handleEndpointException(th, routingContext, "Failed to execute query");
        });
    }

    private QueryStreamResponseWriter getQueryStreamResponseWriter(RoutingContext routingContext, QueryPublisher queryPublisher, Optional<String> optional, Optional<String> optional2, boolean z) {
        String acceptableContentType = routingContext.getAcceptableContentType();
        return (DELIMITED_CONTENT_TYPE.equals(acceptableContentType) || (acceptableContentType == null && !this.queryCompatibilityMode)) ? new DelimitedQueryStreamResponseWriter(routingContext.response()) : KsqlMediaType.KSQL_V1_PROTOBUF.mediaType().equals(acceptableContentType) ? new JsonStreamedRowResponseWriter(routingContext.response(), queryPublisher, optional, optional2, Clock.systemUTC(), z, this.context, JsonStreamedRowResponseWriter.RowFormat.PROTOBUF) : (KsqlMediaType.KSQL_V1_JSON.mediaType().equals(acceptableContentType) || acceptableContentType == null || (JSON_CONTENT_TYPE.equals(acceptableContentType) && this.queryCompatibilityMode)) ? new JsonStreamedRowResponseWriter(routingContext.response(), queryPublisher, optional, optional2, Clock.systemUTC(), z, this.context, JsonStreamedRowResponseWriter.RowFormat.JSON) : new JsonQueryStreamResponseWriter(routingContext.response());
    }

    private CommonRequest getRequest(RoutingContext routingContext) {
        String str;
        Map map;
        Map map2;
        Map map3;
        if (this.queryCompatibilityMode) {
            Optional deserialiseObject = ServerUtils.deserialiseObject(routingContext.getBody(), routingContext, KsqlRequest.class);
            if (!deserialiseObject.isPresent()) {
                return null;
            }
            ApiServerUtils.setMaskedSqlIfNeeded((KsqlRequest) deserialiseObject.get());
            str = ((KsqlRequest) deserialiseObject.get()).getUnmaskedKsql();
            map = ((KsqlRequest) deserialiseObject.get()).getConfigOverrides();
            map2 = ((KsqlRequest) deserialiseObject.get()).getSessionVariables();
            map3 = ((KsqlRequest) deserialiseObject.get()).getRequestProperties();
        } else {
            Optional deserialiseObject2 = ServerUtils.deserialiseObject(routingContext.getBody(), routingContext, QueryStreamArgs.class);
            if (!deserialiseObject2.isPresent()) {
                return null;
            }
            str = ((QueryStreamArgs) deserialiseObject2.get()).sql;
            map = ((QueryStreamArgs) deserialiseObject2.get()).properties;
            map2 = ((QueryStreamArgs) deserialiseObject2.get()).sessionVariables;
            map3 = ((QueryStreamArgs) deserialiseObject2.get()).requestProperties;
        }
        return new CommonRequest(str, map, map2, map3);
    }

    private void handleQueryPublisher(RoutingContext routingContext, QueryPublisher queryPublisher, MetricsCallbackHolder metricsCallbackHolder, long j) {
        QueryResponseMetadata queryResponseMetadata;
        Optional<String> empty = Optional.empty();
        Optional<String> of = Optional.of("Limit Reached");
        boolean z = false;
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (queryPublisher.isPullQuery()) {
            queryResponseMetadata = new QueryResponseMetadata(queryPublisher.queryId().toString(), queryPublisher.getColumnNames(), queryPublisher.getColumnTypes(), queryPublisher.geLogicalSchema());
            of = Optional.empty();
            z = true;
            routingContext.response().endHandler(r15 -> {
                if (atomicBoolean.getAndSet(true)) {
                    log.warn("Connection already closed so just returning");
                } else {
                    queryPublisher.close();
                    metricsCallbackHolder.reportMetrics(routingContext.response().getStatusCode(), routingContext.request().bytesRead(), routingContext.response().bytesWritten(), j);
                }
            });
        } else if (queryPublisher.isScalablePushQuery()) {
            queryResponseMetadata = new QueryResponseMetadata(queryPublisher.queryId().toString(), queryPublisher.getColumnNames(), queryPublisher.getColumnTypes(), preparePushProjectionSchema(queryPublisher.geLogicalSchema()));
            routingContext.response().endHandler(r152 -> {
                if (atomicBoolean.getAndSet(true)) {
                    log.warn("Connection already closed so just returning");
                } else {
                    queryPublisher.close();
                    metricsCallbackHolder.reportMetrics(routingContext.response().getStatusCode(), routingContext.request().bytesRead(), routingContext.response().bytesWritten(), j);
                }
            });
        } else {
            PushQueryHolder createApiQuery = this.connectionQueryManager.createApiQuery(queryPublisher, routingContext.request());
            queryResponseMetadata = new QueryResponseMetadata(queryPublisher.queryId().toString(), queryPublisher.getColumnNames(), queryPublisher.getColumnTypes(), preparePushProjectionSchema(queryPublisher.geLogicalSchema()));
            empty = Optional.of("Query Completed");
            routingContext.response().endHandler(r153 -> {
                if (atomicBoolean.getAndSet(true)) {
                    log.warn("Connection already closed so just returning");
                } else {
                    createApiQuery.close();
                    metricsCallbackHolder.reportMetrics(routingContext.response().getStatusCode(), routingContext.request().bytesRead(), routingContext.response().bytesWritten(), j);
                }
            });
        }
        QueryStreamResponseWriter queryStreamResponseWriter = getQueryStreamResponseWriter(routingContext, queryPublisher, empty, of, z);
        queryStreamResponseWriter.writeMetadata(queryResponseMetadata);
        Context context = this.context;
        HttpServerResponse response = routingContext.response();
        queryPublisher.getClass();
        queryPublisher.subscribe(new QuerySubscriber(context, response, queryStreamResponseWriter, queryPublisher::hitLimit));
    }

    private LogicalSchema preparePushProjectionSchema(LogicalSchema logicalSchema) {
        LogicalSchema.Builder builder = LogicalSchema.builder();
        List value = logicalSchema.value();
        builder.getClass();
        value.forEach((v1) -> {
            r1.valueColumn(v1);
        });
        return builder.build();
    }
}
