package io.confluent.ksql.rest.server.resources.streaming;

import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlMediaType;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.query.QueryExecutor;
import io.confluent.ksql.rest.server.query.QueryMetadataHolder;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
import io.vertx.core.http.ServerWebSocket;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.utils.Time;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.class */
public class WSQueryEndpoint {
    private static final Logger log = LogManager.getLogger(WSQueryEndpoint.class);
    private final KsqlConfig ksqlConfig;
    private final StatementParser statementParser;
    private final KsqlEngine ksqlEngine;
    private final CommandQueue commandQueue;
    private final ListeningScheduledExecutorService exec;
    private final ActivenessRegistrar activenessRegistrar;
    private final Duration commandQueueCatchupTimeout;
    private final Optional<KsqlAuthorizationValidator> authorizationValidator;
    private final Errors errorHandler;
    private final DenyListPropertyValidator denyListPropertyValidator;
    private final QueryExecutor queryExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint$RequestContext.class */
    public static final class RequestContext {
        private final ServerWebSocket websocket;
        private final KsqlRequest request;
        private final KsqlSecurityContext securityContext;

        private RequestContext(ServerWebSocket serverWebSocket, KsqlRequest ksqlRequest, KsqlSecurityContext ksqlSecurityContext) {
            this.websocket = serverWebSocket;
            this.request = ksqlRequest;
            this.securityContext = ksqlSecurityContext;
        }
    }

    public WSQueryEndpoint(KsqlConfig ksqlConfig, StatementParser statementParser, KsqlEngine ksqlEngine, CommandQueue commandQueue, ListeningScheduledExecutorService listeningScheduledExecutorService, ActivenessRegistrar activenessRegistrar, Duration duration, Optional<KsqlAuthorizationValidator> optional, Errors errors, DenyListPropertyValidator denyListPropertyValidator, QueryExecutor queryExecutor) {
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.statementParser = (StatementParser) Objects.requireNonNull(statementParser, "statementParser");
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.commandQueue = (CommandQueue) Objects.requireNonNull(commandQueue, "commandQueue");
        this.exec = (ListeningScheduledExecutorService) Objects.requireNonNull(listeningScheduledExecutorService, "exec");
        this.activenessRegistrar = (ActivenessRegistrar) Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
        this.commandQueueCatchupTimeout = (Duration) Objects.requireNonNull(duration, "commandQueueCatchupTimeout");
        this.authorizationValidator = (Optional) Objects.requireNonNull(optional, "authorizationValidator");
        this.errorHandler = (Errors) Objects.requireNonNull(errors, "errorHandler");
        this.denyListPropertyValidator = (DenyListPropertyValidator) Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
        this.queryExecutor = queryExecutor;
    }

    public void executeStreamQuery(ServerWebSocket serverWebSocket, MultiMap multiMap, KsqlSecurityContext ksqlSecurityContext, Context context, Optional<Long> optional) {
        try {
            long nanoseconds = Time.SYSTEM.nanoseconds();
            if (optional.isPresent()) {
                log.info("Setting websocket timeout to " + String.valueOf(optional.get()) + " ms");
                this.exec.schedule(() -> {
                    SessionUtil.closeSilently(serverWebSocket, WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), "The request token has expired.");
                }, optional.get().longValue(), TimeUnit.MILLISECONDS);
            }
            this.activenessRegistrar.updateLastRequestTime();
            validateVersion(multiMap);
            KsqlRequest parseRequest = parseRequest(multiMap);
            try {
                CommandStoreUtil.waitForCommandSequenceNumber(this.commandQueue, parseRequest, this.commandQueueCatchupTimeout);
                Statement statement = parseStatement(parseRequest).getStatement();
                this.authorizationValidator.ifPresent(ksqlAuthorizationValidator -> {
                    ksqlAuthorizationValidator.checkAuthorization(ksqlSecurityContext, this.ksqlEngine.getMetaStore(), statement);
                });
                RequestContext requestContext = new RequestContext(serverWebSocket, parseRequest, ksqlSecurityContext);
                if (statement instanceof Query) {
                    handleQuery(requestContext, (Query) statement, nanoseconds, context);
                } else {
                    if (!(statement instanceof PrintTopic)) {
                        throw new IllegalArgumentException("Unexpected statement type " + String.valueOf(statement));
                    }
                    handlePrintTopic(requestContext, (PrintTopic) statement);
                }
            } catch (InterruptedException e) {
                log.debug("Interrupted while waiting for command queue to reach specified command sequence number", e);
                SessionUtil.closeSilently(serverWebSocket, WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), e.getMessage());
            } catch (TimeoutException e2) {
                log.debug("Timeout while processing request", e2);
                SessionUtil.closeSilently(serverWebSocket, WebSocketCloseStatus.TRY_AGAIN_LATER.code(), e2.getMessage());
            }
        } catch (Exception e3) {
            log.debug("Error processing request", e3);
            SessionUtil.closeSilently(serverWebSocket, WebSocketCloseStatus.INVALID_MESSAGE_TYPE.code(), e3.getMessage());
        } catch (TopicAuthorizationException e4) {
            log.debug("Error processing request", e4);
            SessionUtil.closeSilently(serverWebSocket, WebSocketCloseStatus.INVALID_MESSAGE_TYPE.code(), this.errorHandler.kafkaAuthorizationErrorMessage(e4));
        }
    }

    private static void validateVersion(MultiMap multiMap) {
        String str = multiMap.get("version");
        if (str == null) {
            return;
        }
        try {
            KsqlMediaType.valueOf("JSON", Integer.parseInt(str));
        } catch (Exception e) {
            throw new IllegalArgumentException("Received invalid api version: " + str, e);
        }
    }

    private KsqlRequest parseRequest(MultiMap multiMap) {
        try {
            String str = multiMap.get("request");
            if (str == null || str.isEmpty()) {
                throw new IllegalArgumentException("missing request parameter");
            }
            KsqlRequest ksqlRequest = (KsqlRequest) ApiJsonMapper.INSTANCE.get().readValue(str, KsqlRequest.class);
            if (ksqlRequest.getUnmaskedKsql().isEmpty()) {
                throw new IllegalArgumentException("\"ksql\" field of \"request\" must be populated");
            }
            this.denyListPropertyValidator.validateAll(ksqlRequest.getConfigOverrides());
            return ksqlRequest;
        } catch (Exception e) {
            throw new IllegalArgumentException("Error parsing request: " + e.getMessage(), e);
        }
    }

    private KsqlParser.PreparedStatement<?> parseStatement(KsqlRequest ksqlRequest) {
        try {
            return this.statementParser.parseSingleStatement(ksqlRequest.getUnmaskedKsql());
        } catch (Exception e) {
            throw new IllegalArgumentException("Error parsing query: " + e.getMessage(), e);
        }
    }

    private void attachCloseHandler(ServerWebSocket serverWebSocket, WebSocketSubscriber<?> webSocketSubscriber) {
        serverWebSocket.closeHandler(r8 -> {
            if (webSocketSubscriber != null) {
                webSocketSubscriber.close();
                log.debug("Websocket {} closed, reason: {},  code: {}", serverWebSocket.textHandlerID(), serverWebSocket.closeReason(), serverWebSocket.closeStatusCode());
            }
        });
    }

    private void handleQuery(RequestContext requestContext, Query query, long j, Context context) {
        WebSocketSubscriber<?> webSocketSubscriber = new WebSocketSubscriber<>(requestContext.websocket);
        attachCloseHandler(requestContext.websocket, webSocketSubscriber);
        KsqlParser.PreparedStatement<?> of = KsqlParser.PreparedStatement.of(requestContext.request.getUnmaskedKsql(), query);
        MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder();
        QueryMetadataHolder handleStatement = this.queryExecutor.handleStatement(requestContext.securityContext.getServiceContext(), requestContext.request.getConfigOverrides(), requestContext.request.getRequestProperties(), of, Optional.empty(), metricsCallbackHolder, context, true);
        if (handleStatement.getPullQueryResult().isPresent()) {
            new PullQueryPublisher(this.exec, handleStatement.getPullQueryResult().get(), metricsCallbackHolder, j).subscribe(webSocketSubscriber);
        } else {
            if (!handleStatement.getPushQueryMetadata().isPresent()) {
                throw new KsqlStatementException("Unknown query type", of.getMaskedStatementText());
            }
            new PushQueryPublisher(this.exec, handleStatement.getPushQueryMetadata().get(), metricsCallbackHolder, j).subscribe(webSocketSubscriber);
        }
    }

    private void handlePrintTopic(RequestContext requestContext, PrintTopic printTopic) {
        String topic = printTopic.getTopic();
        if (!requestContext.securityContext.getServiceContext().getTopicClient().isTopicExists(topic)) {
            throw new IllegalArgumentException("Topic does not exist, or KSQL does not have permission to list the topic: " + topic);
        }
        WebSocketSubscriber<?> webSocketSubscriber = new WebSocketSubscriber<>(requestContext.websocket);
        attachCloseHandler(requestContext.websocket, webSocketSubscriber);
        new PrintPublisher(this.exec, requestContext.securityContext.getServiceContext(), this.ksqlConfig.getKsqlStreamConfigProps(), printTopic).subscribe(webSocketSubscriber);
    }
}
