package io.confluent.ksql.rest.server.resources.streaming;

import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.vertx.core.MultiMap;
import io.vertx.core.http.ServerWebSocket;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.class */
public class WSQueryEndpoint {
    private static final Logger log = LoggerFactory.getLogger(WSQueryEndpoint.class);
    private final KsqlConfig ksqlConfig;
    private final StatementParser statementParser;
    private final KsqlEngine ksqlEngine;
    private final CommandQueue commandQueue;
    private final ListeningScheduledExecutorService exec;
    private final ActivenessRegistrar activenessRegistrar;
    private final QueryPublisher pushQueryPublisher;
    private final IPullQueryPublisher pullQueryPublisher;
    private final PrintTopicPublisher topicPublisher;
    private final Duration commandQueueCatchupTimeout;
    private final Optional<KsqlAuthorizationValidator> authorizationValidator;
    private final Errors errorHandler;
    private final PullQueryExecutor pullQueryExecutor;
    private final DenyListPropertyValidator denyListPropertyValidator;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
    private WebSocketSubscriber<?> subscriber;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint$IPullQueryPublisher.class */
    public interface IPullQueryPublisher {
        void start(KsqlEngine ksqlEngine, ServiceContext serviceContext, ListeningScheduledExecutorService listeningScheduledExecutorService, ConfiguredStatement<Query> configuredStatement, WebSocketSubscriber<StreamedRow> webSocketSubscriber, PullQueryExecutor pullQueryExecutor, Optional<PullQueryExecutorMetrics> optional, long j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint$PrintTopicPublisher.class */
    public interface PrintTopicPublisher {
        void start(ListeningScheduledExecutorService listeningScheduledExecutorService, ServiceContext serviceContext, Map<String, Object> map, PrintTopic printTopic, WebSocketSubscriber<String> webSocketSubscriber);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint$QueryPublisher.class */
    public interface QueryPublisher {
        void start(KsqlEngine ksqlEngine, ServiceContext serviceContext, ListeningScheduledExecutorService listeningScheduledExecutorService, ConfiguredStatement<Query> configuredStatement, WebSocketSubscriber<StreamedRow> webSocketSubscriber);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint$RequestContext.class */
    public static final class RequestContext {
        private final ServerWebSocket websocket;
        private final KsqlRequest request;
        private final KsqlSecurityContext securityContext;

        private RequestContext(ServerWebSocket serverWebSocket, KsqlRequest ksqlRequest, KsqlSecurityContext ksqlSecurityContext) {
            this.websocket = serverWebSocket;
            this.request = ksqlRequest;
            this.securityContext = ksqlSecurityContext;
        }
    }

    public WSQueryEndpoint(KsqlConfig ksqlConfig, StatementParser statementParser, KsqlEngine ksqlEngine, CommandQueue commandQueue, ListeningScheduledExecutorService listeningScheduledExecutorService, ActivenessRegistrar activenessRegistrar, Duration duration, Optional<KsqlAuthorizationValidator> optional, Errors errors, PullQueryExecutor pullQueryExecutor, DenyListPropertyValidator denyListPropertyValidator, Optional<PullQueryExecutorMetrics> optional2) {
        this(ksqlConfig, statementParser, ksqlEngine, commandQueue, listeningScheduledExecutorService, WSQueryEndpoint::startPushQueryPublisher, WSQueryEndpoint::startPullQueryPublisher, WSQueryEndpoint::startPrintPublisher, activenessRegistrar, duration, optional, errors, pullQueryExecutor, denyListPropertyValidator, optional2);
    }

    WSQueryEndpoint(KsqlConfig ksqlConfig, StatementParser statementParser, KsqlEngine ksqlEngine, CommandQueue commandQueue, ListeningScheduledExecutorService listeningScheduledExecutorService, QueryPublisher queryPublisher, IPullQueryPublisher iPullQueryPublisher, PrintTopicPublisher printTopicPublisher, ActivenessRegistrar activenessRegistrar, Duration duration, Optional<KsqlAuthorizationValidator> optional, Errors errors, PullQueryExecutor pullQueryExecutor, DenyListPropertyValidator denyListPropertyValidator, Optional<PullQueryExecutorMetrics> optional2) {
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.statementParser = (StatementParser) Objects.requireNonNull(statementParser, "statementParser");
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.commandQueue = (CommandQueue) Objects.requireNonNull(commandQueue, "commandQueue");
        this.exec = (ListeningScheduledExecutorService) Objects.requireNonNull(listeningScheduledExecutorService, "exec");
        this.pushQueryPublisher = (QueryPublisher) Objects.requireNonNull(queryPublisher, "pushQueryPublisher");
        this.pullQueryPublisher = (IPullQueryPublisher) Objects.requireNonNull(iPullQueryPublisher, "pullQueryPublisher");
        this.topicPublisher = (PrintTopicPublisher) Objects.requireNonNull(printTopicPublisher, "topicPublisher");
        this.activenessRegistrar = (ActivenessRegistrar) Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
        this.commandQueueCatchupTimeout = (Duration) Objects.requireNonNull(duration, "commandQueueCatchupTimeout");
        this.authorizationValidator = (Optional) Objects.requireNonNull(optional, "authorizationValidator");
        this.errorHandler = (Errors) Objects.requireNonNull(errors, "errorHandler");
        this.pullQueryExecutor = (PullQueryExecutor) Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor");
        this.denyListPropertyValidator = (DenyListPropertyValidator) Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
        this.pullQueryMetrics = (Optional) Objects.requireNonNull(optional2, "pullQueryMetrics");
    }

    public void executeStreamQuery(ServerWebSocket serverWebSocket, MultiMap multiMap, KsqlSecurityContext ksqlSecurityContext) {
        try {
            long nanoseconds = Time.SYSTEM.nanoseconds();
            this.activenessRegistrar.updateLastRequestTime();
            validateVersion(multiMap);
            KsqlRequest parseRequest = parseRequest(multiMap);
            try {
                CommandStoreUtil.waitForCommandSequenceNumber(this.commandQueue, parseRequest, this.commandQueueCatchupTimeout);
                Statement statement = parseStatement(parseRequest).getStatement();
                this.authorizationValidator.ifPresent(ksqlAuthorizationValidator -> {
                    ksqlAuthorizationValidator.checkAuthorization(ksqlSecurityContext, this.ksqlEngine.getMetaStore(), statement);
                });
                RequestContext requestContext = new RequestContext(serverWebSocket, parseRequest, ksqlSecurityContext);
                if (statement instanceof Query) {
                    handleQuery(requestContext, (Query) statement, nanoseconds);
                } else {
                    if (!(statement instanceof PrintTopic)) {
                        throw new IllegalArgumentException("Unexpected statement type " + statement);
                    }
                    handlePrintTopic(requestContext, (PrintTopic) statement);
                }
                serverWebSocket.closeHandler(r3 -> {
                    if (this.subscriber != null) {
                        this.subscriber.close();
                    }
                });
            } catch (InterruptedException e) {
                log.debug("Interrupted while waiting for command queue to reach specified command sequence number", e);
                SessionUtil.closeSilently(serverWebSocket, WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), e.getMessage());
            } catch (TimeoutException e2) {
                log.debug("Timeout while processing request", e2);
                SessionUtil.closeSilently(serverWebSocket, WebSocketCloseStatus.TRY_AGAIN_LATER.code(), e2.getMessage());
            }
        } catch (TopicAuthorizationException e3) {
            log.debug("Error processing request", e3);
            SessionUtil.closeSilently(serverWebSocket, WebSocketCloseStatus.INVALID_MESSAGE_TYPE.code(), this.errorHandler.kafkaAuthorizationErrorMessage(e3));
        } catch (Exception e4) {
            log.debug("Error processing request", e4);
            SessionUtil.closeSilently(serverWebSocket, WebSocketCloseStatus.INVALID_MESSAGE_TYPE.code(), e4.getMessage());
        }
    }

    private void validateVersion(MultiMap multiMap) {
        String str = multiMap.get("version");
        if (str != null && !"1".equals(str)) {
            throw new IllegalArgumentException("Received invalid api version: " + str);
        }
    }

    private KsqlRequest parseRequest(MultiMap multiMap) {
        try {
            String str = multiMap.get("request");
            if (str == null || str.isEmpty()) {
                throw new IllegalArgumentException("missing request parameter");
            }
            KsqlRequest ksqlRequest = (KsqlRequest) ApiJsonMapper.INSTANCE.get().readValue(str, KsqlRequest.class);
            if (ksqlRequest.getKsql().isEmpty()) {
                throw new IllegalArgumentException("\"ksql\" field of \"request\" must be populated");
            }
            this.denyListPropertyValidator.validateAll(ksqlRequest.getConfigOverrides());
            return ksqlRequest;
        } catch (Exception e) {
            throw new IllegalArgumentException("Error parsing request: " + e.getMessage(), e);
        }
    }

    private KsqlParser.PreparedStatement<?> parseStatement(KsqlRequest ksqlRequest) {
        try {
            return this.statementParser.parseSingleStatement(ksqlRequest.getKsql());
        } catch (Exception e) {
            throw new IllegalArgumentException("Error parsing query: " + e.getMessage(), e);
        }
    }

    private void handleQuery(RequestContext requestContext, Query query, long j) {
        Map configOverrides = requestContext.request.getConfigOverrides();
        WebSocketSubscriber<StreamedRow> webSocketSubscriber = new WebSocketSubscriber<>(requestContext.websocket);
        this.subscriber = webSocketSubscriber;
        ConfiguredStatement<Query> of = ConfiguredStatement.of(KsqlParser.PreparedStatement.of(requestContext.request.getKsql(), query), SessionConfig.of(this.ksqlConfig, configOverrides));
        if (query.isPullQuery()) {
            this.pullQueryPublisher.start(this.ksqlEngine, requestContext.securityContext.getServiceContext(), this.exec, of, webSocketSubscriber, this.pullQueryExecutor, this.pullQueryMetrics, j);
        } else {
            this.pushQueryPublisher.start(this.ksqlEngine, requestContext.securityContext.getServiceContext(), this.exec, of, webSocketSubscriber);
        }
    }

    private void handlePrintTopic(RequestContext requestContext, PrintTopic printTopic) {
        String topic = printTopic.getTopic();
        if (!requestContext.securityContext.getServiceContext().getTopicClient().isTopicExists(topic)) {
            throw new IllegalArgumentException("Topic does not exist, or KSQL does not have permission to list the topic: " + topic);
        }
        WebSocketSubscriber<String> webSocketSubscriber = new WebSocketSubscriber<>(requestContext.websocket);
        this.subscriber = webSocketSubscriber;
        this.topicPublisher.start(this.exec, requestContext.securityContext.getServiceContext(), this.ksqlConfig.getKsqlStreamConfigProps(), printTopic, webSocketSubscriber);
    }

    private void handleUnsupportedStatement(RequestContext requestContext, Statement statement) {
        throw new IllegalArgumentException(String.format("Statement type `%s' not supported for this resource", statement.getClass().getName()));
    }

    private static void startPushQueryPublisher(KsqlEngine ksqlEngine, ServiceContext serviceContext, ListeningScheduledExecutorService listeningScheduledExecutorService, ConfiguredStatement<Query> configuredStatement, WebSocketSubscriber<StreamedRow> webSocketSubscriber) {
        new PushQueryPublisher(ksqlEngine, serviceContext, listeningScheduledExecutorService, configuredStatement).subscribe(webSocketSubscriber);
    }

    private static void startPullQueryPublisher(KsqlEngine ksqlEngine, ServiceContext serviceContext, ListeningScheduledExecutorService listeningScheduledExecutorService, ConfiguredStatement<Query> configuredStatement, WebSocketSubscriber<StreamedRow> webSocketSubscriber, PullQueryExecutor pullQueryExecutor, Optional<PullQueryExecutorMetrics> optional, long j) {
        new PullQueryPublisher(serviceContext, configuredStatement, pullQueryExecutor, optional, j).subscribe(webSocketSubscriber);
    }

    private static void startPrintPublisher(ListeningScheduledExecutorService listeningScheduledExecutorService, ServiceContext serviceContext, Map<String, Object> map, PrintTopic printTopic, WebSocketSubscriber<String> webSocketSubscriber) {
        new PrintPublisher(listeningScheduledExecutorService, serviceContext, map, printTopic).subscribe(webSocketSubscriber);
    }
}
