package io.confluent.ksql.rest.server.resources.streaming;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.security.Principal;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import javax.ws.rs.core.Response;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ServerEndpoint("/query")
/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.class */
public class WSQueryEndpoint {
    private static final Logger log = LoggerFactory.getLogger(WSQueryEndpoint.class);
    private static final HandlerMaps.ClassHandlerMap2<Statement, WSQueryEndpoint, RequestContext> HANDLER_MAP = HandlerMaps.forClass(Statement.class).withArgTypes(WSQueryEndpoint.class, RequestContext.class).put(Query.class, (v0, v1, v2) -> {
        v0.handleQuery(v1, v2);
    }).put(PrintTopic.class, (v0, v1, v2) -> {
        v0.handlePrintTopic(v1, v2);
    }).build();
    private final KsqlConfig ksqlConfig;
    private final ObjectMapper mapper;
    private final StatementParser statementParser;
    private final KsqlEngine ksqlEngine;
    private final CommandQueue commandQueue;
    private final ListeningScheduledExecutorService exec;
    private final ActivenessRegistrar activenessRegistrar;
    private final QueryPublisher pushQueryPublisher;
    private final IPullQueryPublisher pullQueryPublisher;
    private final PrintTopicPublisher topicPublisher;
    private final Duration commandQueueCatchupTimeout;
    private final Optional<KsqlAuthorizationValidator> authorizationValidator;
    private final KsqlSecurityExtension securityExtension;
    private final RestServiceContextFactory.UserServiceContextFactory serviceContextFactory;
    private final RestServiceContextFactory.DefaultServiceContextFactory defaultServiceContextFactory;
    private final ServerState serverState;
    private final Errors errorHandler;
    private final Supplier<SchemaRegistryClient> schemaRegistryClientFactory;
    private final PullQueryExecutor pullQueryExecutor;
    private WebSocketSubscriber<?> subscriber;
    private KsqlSecurityContext securityContext;

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint$IPullQueryPublisher.class */
    interface IPullQueryPublisher {
        void start(KsqlEngine ksqlEngine, ServiceContext serviceContext, ListeningScheduledExecutorService listeningScheduledExecutorService, ConfiguredStatement<Query> configuredStatement, WebSocketSubscriber<StreamedRow> webSocketSubscriber, PullQueryExecutor pullQueryExecutor);
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint$PrintTopicPublisher.class */
    interface PrintTopicPublisher {
        void start(ListeningScheduledExecutorService listeningScheduledExecutorService, ServiceContext serviceContext, Map<String, Object> map, PrintTopic printTopic, WebSocketSubscriber<String> webSocketSubscriber);
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint$QueryPublisher.class */
    interface QueryPublisher {
        void start(KsqlEngine ksqlEngine, ServiceContext serviceContext, ListeningScheduledExecutorService listeningScheduledExecutorService, ConfiguredStatement<Query> configuredStatement, WebSocketSubscriber<StreamedRow> webSocketSubscriber);
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint$RequestContext.class */
    private static final class RequestContext {
        private final Session session;
        private final KsqlRequest request;
        private final KsqlSecurityContext securityContext;

        private RequestContext(Session session, KsqlRequest ksqlRequest, KsqlSecurityContext ksqlSecurityContext) {
            this.session = session;
            this.request = ksqlRequest;
            this.securityContext = ksqlSecurityContext;
        }
    }

    public WSQueryEndpoint(KsqlConfig ksqlConfig, ObjectMapper objectMapper, StatementParser statementParser, KsqlEngine ksqlEngine, CommandQueue commandQueue, ListeningScheduledExecutorService listeningScheduledExecutorService, ActivenessRegistrar activenessRegistrar, Duration duration, Optional<KsqlAuthorizationValidator> optional, Errors errors, KsqlSecurityExtension ksqlSecurityExtension, ServerState serverState, Supplier<SchemaRegistryClient> supplier, PullQueryExecutor pullQueryExecutor) {
        this(ksqlConfig, objectMapper, statementParser, ksqlEngine, commandQueue, listeningScheduledExecutorService, WSQueryEndpoint::startPushQueryPublisher, WSQueryEndpoint::startPullQueryPublisher, WSQueryEndpoint::startPrintPublisher, activenessRegistrar, duration, optional, errors, ksqlSecurityExtension, RestServiceContextFactory::create, RestServiceContextFactory::create, serverState, supplier, pullQueryExecutor);
    }

    WSQueryEndpoint(KsqlConfig ksqlConfig, ObjectMapper objectMapper, StatementParser statementParser, KsqlEngine ksqlEngine, CommandQueue commandQueue, ListeningScheduledExecutorService listeningScheduledExecutorService, QueryPublisher queryPublisher, IPullQueryPublisher iPullQueryPublisher, PrintTopicPublisher printTopicPublisher, ActivenessRegistrar activenessRegistrar, Duration duration, Optional<KsqlAuthorizationValidator> optional, Errors errors, KsqlSecurityExtension ksqlSecurityExtension, RestServiceContextFactory.UserServiceContextFactory userServiceContextFactory, RestServiceContextFactory.DefaultServiceContextFactory defaultServiceContextFactory, ServerState serverState, Supplier<SchemaRegistryClient> supplier, PullQueryExecutor pullQueryExecutor) {
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.mapper = (ObjectMapper) Objects.requireNonNull(objectMapper, "mapper");
        this.statementParser = (StatementParser) Objects.requireNonNull(statementParser, "statementParser");
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.commandQueue = (CommandQueue) Objects.requireNonNull(commandQueue, "commandQueue");
        this.exec = (ListeningScheduledExecutorService) Objects.requireNonNull(listeningScheduledExecutorService, "exec");
        this.pushQueryPublisher = (QueryPublisher) Objects.requireNonNull(queryPublisher, "pushQueryPublisher");
        this.pullQueryPublisher = (IPullQueryPublisher) Objects.requireNonNull(iPullQueryPublisher, "pullQueryPublisher");
        this.topicPublisher = (PrintTopicPublisher) Objects.requireNonNull(printTopicPublisher, "topicPublisher");
        this.activenessRegistrar = (ActivenessRegistrar) Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
        this.commandQueueCatchupTimeout = (Duration) Objects.requireNonNull(duration, "commandQueueCatchupTimeout");
        this.authorizationValidator = (Optional) Objects.requireNonNull(optional, "authorizationValidator");
        this.securityExtension = (KsqlSecurityExtension) Objects.requireNonNull(ksqlSecurityExtension, "securityExtension");
        this.serviceContextFactory = (RestServiceContextFactory.UserServiceContextFactory) Objects.requireNonNull(userServiceContextFactory, "serviceContextFactory");
        this.defaultServiceContextFactory = (RestServiceContextFactory.DefaultServiceContextFactory) Objects.requireNonNull(defaultServiceContextFactory, "defaultServiceContextFactory");
        this.serverState = (ServerState) Objects.requireNonNull(serverState, "serverState");
        this.errorHandler = (Errors) Objects.requireNonNull(errors, "errorHandler");
        this.schemaRegistryClientFactory = (Supplier) Objects.requireNonNull(supplier, "schemaRegistryClientFactory");
        this.pullQueryExecutor = (PullQueryExecutor) Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor");
    }

    @OnOpen
    public void onOpen(Session session, EndpointConfig endpointConfig) {
        log.debug("Opening websocket session {}", session.getId());
        try {
            checkAuthorization(session);
            validateVersion(session);
            Optional<Response> checkReady = this.serverState.checkReady();
            if (checkReady.isPresent()) {
                SessionUtil.closeSilently(session, CloseReason.CloseCodes.TRY_AGAIN_LATER, ((KsqlErrorMessage) checkReady.get().getEntity()).getMessage());
                return;
            }
            KsqlRequest parseRequest = parseRequest(session);
            try {
                CommandStoreUtil.waitForCommandSequenceNumber(this.commandQueue, parseRequest, this.commandQueueCatchupTimeout);
                KsqlParser.PreparedStatement<?> parseStatement = parseStatement(parseRequest);
                this.securityContext = createSecurityContext(session.getUserPrincipal());
                Statement statement = parseStatement.getStatement();
                Class<?> cls = statement.getClass();
                validateKafkaAuthorization(statement);
                HANDLER_MAP.getOrDefault(cls, (v0, v1, v2) -> {
                    v0.handleUnsupportedStatement(v1, v2);
                }).handle(this, new RequestContext(session, parseRequest, this.securityContext), statement);
            } catch (InterruptedException e) {
                log.debug("Interrupted while waiting for command queue to reach specified command sequence number", e);
                SessionUtil.closeSilently(session, CloseReason.CloseCodes.UNEXPECTED_CONDITION, e.getMessage());
            } catch (TimeoutException e2) {
                log.debug("Timeout while processing request", e2);
                SessionUtil.closeSilently(session, CloseReason.CloseCodes.TRY_AGAIN_LATER, e2.getMessage());
            }
        } catch (TopicAuthorizationException e3) {
            log.debug("Error processing request", e3);
            SessionUtil.closeSilently(session, CloseReason.CloseCodes.CANNOT_ACCEPT, this.errorHandler.kafkaAuthorizationErrorMessage(e3));
        } catch (Exception e4) {
            log.debug("Error processing request", e4);
            SessionUtil.closeSilently(session, CloseReason.CloseCodes.CANNOT_ACCEPT, e4.getMessage());
        }
    }

    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        if (this.subscriber != null) {
            this.subscriber.close();
        }
        if (this.securityContext != null) {
            this.securityContext.getServiceContext().close();
        }
        log.debug("Closing websocket session {} ({}): {}", new Object[]{session.getId(), closeReason.getCloseCode(), closeReason.getReasonPhrase()});
    }

    @OnError
    public void onError(Session session, Throwable th) {
        log.error("websocket error in session {}", session.getId(), th);
        SessionUtil.closeSilently(session, CloseReason.CloseCodes.UNEXPECTED_CONDITION, th.getMessage());
    }

    private void checkAuthorization(Session session) {
        String value = getClass().getAnnotation(ServerEndpoint.class).value();
        Principal userPrincipal = session.getUserPrincipal();
        this.securityExtension.getAuthorizationProvider().ifPresent(ksqlAuthorizationProvider -> {
            try {
                ksqlAuthorizationProvider.checkEndpointAccess(userPrincipal, "POST", value);
            } catch (Throwable th) {
                log.warn(String.format("User:%s is denied access to Websocket %s endpoint", userPrincipal.getName(), value), th);
                throw new KsqlException(th);
            }
        });
    }

    private KsqlSecurityContext createSecurityContext(Principal principal) {
        return new KsqlSecurityContext(Optional.ofNullable(principal), !this.securityExtension.getUserContextProvider().isPresent() ? this.defaultServiceContextFactory.create(this.ksqlConfig, Optional.empty(), this.schemaRegistryClientFactory) : (ServiceContext) this.securityExtension.getUserContextProvider().map(ksqlUserContextProvider -> {
            return this.serviceContextFactory.create(this.ksqlConfig, Optional.empty(), ksqlUserContextProvider.getKafkaClientSupplier(principal), ksqlUserContextProvider.getSchemaRegistryClientFactory(principal));
        }).get());
    }

    private void validateVersion(Session session) {
        Map requestParameterMap = session.getRequestParameterMap();
        this.activenessRegistrar.updateLastRequestTime();
        List list = (List) requestParameterMap.getOrDefault("version", Collections.singletonList("1"));
        if (list.isEmpty()) {
            return;
        }
        if (list.size() != 1) {
            throw new IllegalArgumentException("Received multiple api versions: " + list);
        }
        if (!((String) list.get(0)).equals("1")) {
            throw new IllegalArgumentException("Received invalid api version: " + list);
        }
    }

    private KsqlRequest parseRequest(Session session) {
        try {
            List list = (List) session.getRequestParameterMap().getOrDefault("request", Collections.emptyList());
            if (list == null || list.isEmpty()) {
                throw new IllegalArgumentException("missing request parameter");
            }
            String str = (String) Iterables.getLast(list, "");
            if (str == null || str.isEmpty()) {
                throw new IllegalArgumentException("request parameter empty");
            }
            KsqlRequest ksqlRequest = (KsqlRequest) this.mapper.readValue(str, KsqlRequest.class);
            if (ksqlRequest.getKsql().isEmpty()) {
                throw new IllegalArgumentException("\"ksql\" field of \"request\" must be populated");
            }
            ksqlRequest.getStreamsProperties();
            return ksqlRequest;
        } catch (Exception e) {
            throw new IllegalArgumentException("Error parsing request: " + e.getMessage(), e);
        }
    }

    private KsqlParser.PreparedStatement<?> parseStatement(KsqlRequest ksqlRequest) {
        try {
            return this.statementParser.parseSingleStatement(ksqlRequest.getKsql());
        } catch (Exception e) {
            throw new IllegalArgumentException("Error parsing query: " + e.getMessage(), e);
        }
    }

    private void validateKafkaAuthorization(Statement statement) {
        this.authorizationValidator.ifPresent(ksqlAuthorizationValidator -> {
            ksqlAuthorizationValidator.checkAuthorization(this.securityContext, this.ksqlEngine.getMetaStore(), statement);
        });
    }

    private void handleQuery(RequestContext requestContext, Query query) {
        Map streamsProperties = requestContext.request.getStreamsProperties();
        WebSocketSubscriber<StreamedRow> webSocketSubscriber = new WebSocketSubscriber<>(requestContext.session, this.mapper);
        this.subscriber = webSocketSubscriber;
        ConfiguredStatement<Query> of = ConfiguredStatement.of(KsqlParser.PreparedStatement.of(requestContext.request.getKsql(), query), streamsProperties, this.ksqlConfig);
        if (query.isPullQuery()) {
            this.pullQueryPublisher.start(this.ksqlEngine, requestContext.securityContext.getServiceContext(), this.exec, of, webSocketSubscriber, this.pullQueryExecutor);
        } else {
            this.pushQueryPublisher.start(this.ksqlEngine, requestContext.securityContext.getServiceContext(), this.exec, of, webSocketSubscriber);
        }
    }

    private void handlePrintTopic(RequestContext requestContext, PrintTopic printTopic) {
        String topic = printTopic.getTopic();
        if (!requestContext.securityContext.getServiceContext().getTopicClient().isTopicExists(topic)) {
            throw new IllegalArgumentException("Topic does not exist, or KSQL does not have permission to list the topic: " + topic);
        }
        WebSocketSubscriber<String> webSocketSubscriber = new WebSocketSubscriber<>(requestContext.session, this.mapper);
        this.subscriber = webSocketSubscriber;
        this.topicPublisher.start(this.exec, requestContext.securityContext.getServiceContext(), this.ksqlConfig.getKsqlStreamConfigProps(), printTopic, webSocketSubscriber);
    }

    private void handleUnsupportedStatement(RequestContext requestContext, Statement statement) {
        throw new IllegalArgumentException(String.format("Statement type `%s' not supported for this resource", statement.getClass().getName()));
    }

    private static void startPushQueryPublisher(KsqlEngine ksqlEngine, ServiceContext serviceContext, ListeningScheduledExecutorService listeningScheduledExecutorService, ConfiguredStatement<Query> configuredStatement, WebSocketSubscriber<StreamedRow> webSocketSubscriber) {
        new PushQueryPublisher(ksqlEngine, serviceContext, listeningScheduledExecutorService, configuredStatement).subscribe(webSocketSubscriber);
    }

    private static void startPullQueryPublisher(KsqlEngine ksqlEngine, ServiceContext serviceContext, ListeningScheduledExecutorService listeningScheduledExecutorService, ConfiguredStatement<Query> configuredStatement, WebSocketSubscriber<StreamedRow> webSocketSubscriber, PullQueryExecutor pullQueryExecutor) {
        new PullQueryPublisher(serviceContext, configuredStatement, pullQueryExecutor).subscribe(webSocketSubscriber);
    }

    private static void startPrintPublisher(ListeningScheduledExecutorService listeningScheduledExecutorService, ServiceContext serviceContext, Map<String, Object> map, PrintTopic printTopic, WebSocketSubscriber<String> webSocketSubscriber) {
        new PrintPublisher(listeningScheduledExecutorService, serviceContext, map, printTopic).subscribe(webSocketSubscriber);
    }
}
