package io.confluent.ksql.rest.server.resources.streaming;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.entity.TableRows;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.execution.PullQueryResult;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.class */
public class StreamedQueryResource implements KsqlConfigurable {
    private static final Logger log = LoggerFactory.getLogger(StreamedQueryResource.class);
    private static final ObjectMapper OBJECT_MAPPER = ApiJsonMapper.INSTANCE.get();
    private final KsqlEngine ksqlEngine;
    private final StatementParser statementParser;
    private final CommandQueue commandQueue;
    private final Duration disconnectCheckInterval;
    private final Duration commandQueueCatchupTimeout;
    private final ActivenessRegistrar activenessRegistrar;
    private final Optional<KsqlAuthorizationValidator> authorizationValidator;
    private final Errors errorHandler;
    private KsqlConfig ksqlConfig;
    private final PullQueryExecutor pullQueryExecutor;
    private final DenyListPropertyValidator denyListPropertyValidator;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

    public StreamedQueryResource(KsqlEngine ksqlEngine, CommandQueue commandQueue, Duration duration, Duration duration2, ActivenessRegistrar activenessRegistrar, Optional<KsqlAuthorizationValidator> optional, Errors errors, PullQueryExecutor pullQueryExecutor, DenyListPropertyValidator denyListPropertyValidator, Optional<PullQueryExecutorMetrics> optional2) {
        this(ksqlEngine, new StatementParser(ksqlEngine), commandQueue, duration, duration2, activenessRegistrar, optional, errors, pullQueryExecutor, denyListPropertyValidator, optional2);
    }

    @VisibleForTesting
    StreamedQueryResource(KsqlEngine ksqlEngine, StatementParser statementParser, CommandQueue commandQueue, Duration duration, Duration duration2, ActivenessRegistrar activenessRegistrar, Optional<KsqlAuthorizationValidator> optional, Errors errors, PullQueryExecutor pullQueryExecutor, DenyListPropertyValidator denyListPropertyValidator, Optional<PullQueryExecutorMetrics> optional2) {
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.statementParser = (StatementParser) Objects.requireNonNull(statementParser, "statementParser");
        this.commandQueue = (CommandQueue) Objects.requireNonNull(commandQueue, "commandQueue");
        this.disconnectCheckInterval = (Duration) Objects.requireNonNull(duration, "disconnectCheckInterval");
        this.commandQueueCatchupTimeout = (Duration) Objects.requireNonNull(duration2, "commandQueueCatchupTimeout");
        this.activenessRegistrar = (ActivenessRegistrar) Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
        this.authorizationValidator = optional;
        this.errorHandler = (Errors) Objects.requireNonNull(errors, "errorHandler");
        this.pullQueryExecutor = (PullQueryExecutor) Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor");
        this.denyListPropertyValidator = (DenyListPropertyValidator) Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
        this.pullQueryMetrics = (Optional) Objects.requireNonNull(optional2, "pullQueryMetrics");
    }

    @Override // io.confluent.ksql.rest.server.resources.KsqlConfigurable
    public void configure(KsqlConfig ksqlConfig) {
        if (!ksqlConfig.getKsqlStreamConfigProps().containsKey("application.server")) {
            throw new IllegalArgumentException("Need KS application server set");
        }
        this.ksqlConfig = ksqlConfig;
    }

    public EndpointResponse streamQuery(KsqlSecurityContext ksqlSecurityContext, KsqlRequest ksqlRequest, CompletableFuture<Void> completableFuture, Optional<Boolean> optional) {
        throwIfNotConfigured();
        this.activenessRegistrar.updateLastRequestTime();
        KsqlParser.PreparedStatement<?> parseStatement = parseStatement(ksqlRequest);
        CommandStoreUtil.httpWaitForCommandSequenceNumber(this.commandQueue, ksqlRequest, this.commandQueueCatchupTimeout);
        return handleStatement(ksqlSecurityContext, ksqlRequest, parseStatement, completableFuture, optional);
    }

    private void throwIfNotConfigured() {
        if (this.ksqlConfig == null) {
            throw new KsqlRestException(Errors.notReady());
        }
    }

    private KsqlParser.PreparedStatement<?> parseStatement(KsqlRequest ksqlRequest) {
        String ksql = ksqlRequest.getKsql();
        if (ksql.trim().isEmpty()) {
            throw new KsqlRestException(Errors.badRequest("\"ksql\" field must be populated"));
        }
        try {
            return this.statementParser.parseSingleStatement(ksql);
        } catch (IllegalArgumentException | KsqlException e) {
            throw new KsqlRestException(Errors.badStatement(e, ksql));
        }
    }

    private EndpointResponse handleStatement(KsqlSecurityContext ksqlSecurityContext, KsqlRequest ksqlRequest, KsqlParser.PreparedStatement<?> preparedStatement, CompletableFuture<Void> completableFuture, Optional<Boolean> optional) {
        try {
            this.authorizationValidator.ifPresent(ksqlAuthorizationValidator -> {
                ksqlAuthorizationValidator.checkAuthorization(ksqlSecurityContext, this.ksqlEngine.getMetaStore(), preparedStatement.getStatement());
            });
            Map<String, Object> configOverrides = ksqlRequest.getConfigOverrides();
            this.denyListPropertyValidator.validateAll(configOverrides);
            return preparedStatement.getStatement() instanceof Query ? preparedStatement.getStatement().isPullQuery() ? handlePullQuery(ksqlSecurityContext.getServiceContext(), preparedStatement, configOverrides, ksqlRequest.getRequestProperties(), optional, this.pullQueryMetrics) : handlePushQuery(ksqlSecurityContext.getServiceContext(), preparedStatement, configOverrides, completableFuture) : preparedStatement.getStatement() instanceof PrintTopic ? handlePrintTopic(ksqlSecurityContext.getServiceContext(), configOverrides, preparedStatement, completableFuture) : Errors.badRequest(String.format("Statement type `%s' not supported for this resource", preparedStatement.getClass().getName()));
        } catch (KsqlException e) {
            return this.errorHandler.generateResponse(e, Errors.badRequest(e));
        } catch (KsqlStatementException e2) {
            return Errors.badStatement(e2.getRawMessage(), e2.getSqlStatement());
        } catch (TopicAuthorizationException e3) {
            return this.errorHandler.accessDeniedFromKafkaResponse(e3);
        }
    }

    private EndpointResponse handlePullQuery(ServiceContext serviceContext, KsqlParser.PreparedStatement<Query> preparedStatement, Map<String, Object> map, Map<String, Object> map2, Optional<Boolean> optional, Optional<PullQueryExecutorMetrics> optional2) {
        PullQueryResult execute = this.pullQueryExecutor.execute(ConfiguredStatement.of(preparedStatement, SessionConfig.of(this.ksqlConfig, map)), map2, serviceContext, optional, optional2);
        TableRows tableRows = execute.getTableRows();
        Optional<U> map3 = execute.getSourceNodes().map(list -> {
            return (List) list.stream().map((v0) -> {
                return v0.location();
            }).map(uri -> {
                return new KsqlHostInfoEntity(uri.getHost(), uri.getPort());
            }).collect(Collectors.toList());
        });
        StreamedRow header = StreamedRow.header(tableRows.getQueryId(), tableRows.getSchema());
        map3.ifPresent(list2 -> {
            Preconditions.checkState(list2.size() == tableRows.getRows().size());
        });
        List list3 = (List) IntStream.range(0, tableRows.getRows().size()).mapToObj(i -> {
            return Pair.of(toGenericRow(tableRows.getRows().get(i)), map3.map(list4 -> {
                return (KsqlHostInfoEntity) list4.get(i);
            }));
        }).map(pair -> {
            return StreamedRow.row((GenericRow) pair.getLeft(), (Optional) pair.getRight());
        }).collect(Collectors.toList());
        list3.add(0, header);
        return EndpointResponse.ok((String) list3.stream().map((v1) -> {
            return writeValueAsString(v1);
        }).collect(Collectors.joining("," + System.lineSeparator(), "[", "]")));
    }

    private EndpointResponse handlePushQuery(ServiceContext serviceContext, KsqlParser.PreparedStatement<Query> preparedStatement, Map<String, Object> map, CompletableFuture<Void> completableFuture) {
        QueryStreamWriter queryStreamWriter = new QueryStreamWriter(this.ksqlEngine.executeQuery(serviceContext, ConfiguredStatement.of(preparedStatement, SessionConfig.of(this.ksqlConfig, map))), this.disconnectCheckInterval.toMillis(), OBJECT_MAPPER, completableFuture);
        log.info("Streaming query '{}'", preparedStatement.getStatementText());
        return EndpointResponse.ok(queryStreamWriter);
    }

    private String writeValueAsString(Object obj) {
        try {
            return OBJECT_MAPPER.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private EndpointResponse handlePrintTopic(ServiceContext serviceContext, Map<String, Object> map, KsqlParser.PreparedStatement<PrintTopic> preparedStatement, CompletableFuture<Void> completableFuture) {
        PrintTopic statement = preparedStatement.getStatement();
        String topic = statement.getTopic();
        if (!serviceContext.getTopicClient().isTopicExists(topic)) {
            Collection<String> findPossibleTopicMatches = findPossibleTopicMatches(topic, serviceContext);
            throw new KsqlRestException(Errors.badRequest("Could not find topic '" + topic + "', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive." + (findPossibleTopicMatches.isEmpty() ? KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_DEFAULT : (String) findPossibleTopicMatches.stream().map(str -> {
                return "\tprint " + str + ";";
            }).collect(Collectors.joining(System.lineSeparator(), System.lineSeparator() + "Did you mean:" + System.lineSeparator(), KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_DEFAULT)))));
        }
        HashMap hashMap = new HashMap(this.ksqlConfig.getKsqlStreamConfigProps());
        hashMap.putAll(map);
        TopicStreamWriter create = TopicStreamWriter.create(serviceContext, hashMap, statement, this.disconnectCheckInterval, completableFuture);
        log.info("Printing topic '{}'", topic);
        return EndpointResponse.ok(create);
    }

    private static Collection<String> findPossibleTopicMatches(String str, ServiceContext serviceContext) {
        return (Collection) serviceContext.getTopicClient().listTopicNames().stream().filter(str2 -> {
            return str2.equalsIgnoreCase(str);
        }).collect(Collectors.toSet());
    }

    private static GenericRow toGenericRow(List<?> list) {
        return new GenericRow().appendAll(list);
    }
}
