package io.confluent.ksql.api.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.server.QueryHandle;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.pull.PullQueryResult;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.server.query.QueryExecutor;
import io.confluent.ksql.rest.server.query.QueryMetadataHolder;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PushQueryMetadata;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/* loaded from: input_file:io/confluent/ksql/api/impl/QueryEndpoint.class */
public class QueryEndpoint {
    private final KsqlExecutionContext ksqlEngine;
    private final KsqlConfig ksqlConfig;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
    private final QueryExecutor queryExecutor;

    /* loaded from: input_file:io/confluent/ksql/api/impl/QueryEndpoint$KsqlPullQueryHandle.class */
    private static class KsqlPullQueryHandle implements QueryHandle {
        private final PullQueryResult result;
        private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
        private final String maskedStatementText;
        private final CompletableFuture<Void> future = new CompletableFuture<>();

        KsqlPullQueryHandle(PullQueryResult pullQueryResult, Optional<PullQueryExecutorMetrics> optional, String str) {
            this.result = (PullQueryResult) Objects.requireNonNull(pullQueryResult);
            this.pullQueryMetrics = (Optional) Objects.requireNonNull(optional);
            this.maskedStatementText = str;
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public List<String> getColumnNames() {
            return QueryEndpoint.colNamesFromSchema(this.result.getSchema().columns());
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public List<String> getColumnTypes() {
            return QueryEndpoint.colTypesFromSchema(this.result.getSchema().columns());
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public LogicalSchema getLogicalSchema() {
            return this.result.getSchema();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public void start() {
            try {
                PullQueryResult pullQueryResult = this.result;
                CompletableFuture<Void> completableFuture = this.future;
                completableFuture.getClass();
                pullQueryResult.onException(completableFuture::completeExceptionally);
                PullQueryResult pullQueryResult2 = this.result;
                CompletableFuture<Void> completableFuture2 = this.future;
                completableFuture2.getClass();
                pullQueryResult2.onCompletion((v1) -> {
                    r1.complete(v1);
                });
                this.result.start();
            } catch (Exception e) {
                this.pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> {
                    pullQueryExecutorMetrics.recordErrorRate(1.0d, this.result.getSourceType(), this.result.getPlanType(), this.result.getRoutingNodeType());
                });
                throw new KsqlStatementException("Error starting pull query: " + e.getMessage(), this.maskedStatementText, e);
            }
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public void stop() {
            this.result.stop();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public BlockingRowQueue getQueue() {
            return this.result.getPullQueryQueue();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public void onException(Consumer<Throwable> consumer) {
            this.future.exceptionally(th -> {
                consumer.accept(th);
                return null;
            });
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public QueryId getQueryId() {
            return this.result.getQueryId();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public Optional<ConsistencyOffsetVector> getConsistencyOffsetVector() {
            return this.result.getConsistencyOffsetVector();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public Optional<PushQueryMetadata.ResultType> getResultType() {
            return Optional.empty();
        }
    }

    /* loaded from: input_file:io/confluent/ksql/api/impl/QueryEndpoint$KsqlQueryHandle.class */
    private static class KsqlQueryHandle implements QueryHandle {
        private final PushQueryMetadata queryMetadata;

        KsqlQueryHandle(PushQueryMetadata pushQueryMetadata) {
            this.queryMetadata = (PushQueryMetadata) Objects.requireNonNull(pushQueryMetadata, "queryMetadata");
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public List<String> getColumnNames() {
            return QueryEndpoint.colNamesFromSchema(this.queryMetadata.getLogicalSchema().value());
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public List<String> getColumnTypes() {
            return QueryEndpoint.colTypesFromSchema(this.queryMetadata.getLogicalSchema().value());
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public LogicalSchema getLogicalSchema() {
            return this.queryMetadata.getLogicalSchema();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public void start() {
            this.queryMetadata.start();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public void stop() {
            this.queryMetadata.close();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public BlockingRowQueue getQueue() {
            return this.queryMetadata.getRowQueue();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public void onException(Consumer<Throwable> consumer) {
            this.queryMetadata.setUncaughtExceptionHandler(th -> {
                consumer.accept(th);
                return null;
            });
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public QueryId getQueryId() {
            return this.queryMetadata.getQueryId();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public Optional<ConsistencyOffsetVector> getConsistencyOffsetVector() {
            return Optional.empty();
        }

        @Override // io.confluent.ksql.api.server.QueryHandle
        public Optional<PushQueryMetadata.ResultType> getResultType() {
            return Optional.of(this.queryMetadata.getResultType());
        }
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public QueryEndpoint(KsqlExecutionContext ksqlExecutionContext, KsqlConfig ksqlConfig, Optional<PullQueryExecutorMetrics> optional, QueryExecutor queryExecutor) {
        this.ksqlEngine = ksqlExecutionContext;
        this.ksqlConfig = ksqlConfig;
        this.pullQueryMetrics = optional;
        this.queryExecutor = queryExecutor;
    }

    public QueryPublisher createQueryPublisher(String str, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3, Context context, WorkerExecutor workerExecutor, ServiceContext serviceContext, MetricsCallbackHolder metricsCallbackHolder, Optional<Boolean> optional) {
        VertxUtils.checkIsWorker();
        ConfiguredStatement<Query> createStatement = createStatement(str, map, map2);
        QueryMetadataHolder handleStatement = this.queryExecutor.handleStatement(serviceContext, map, map3, createStatement.getPreparedStatement(), optional, metricsCallbackHolder, context, false);
        if (handleStatement.getPullQueryResult().isPresent()) {
            PullQueryResult pullQueryResult = handleStatement.getPullQueryResult().get();
            BlockingQueryPublisher blockingQueryPublisher = new BlockingQueryPublisher(context, workerExecutor);
            blockingQueryPublisher.setQueryHandle(new KsqlPullQueryHandle(pullQueryResult, this.pullQueryMetrics, createStatement.getPreparedStatement().getMaskedStatementText()), true, false);
            blockingQueryPublisher.startFromWorkerThread();
            return blockingQueryPublisher;
        }
        if (!handleStatement.getPushQueryMetadata().isPresent()) {
            throw new KsqlStatementException("Unexpected metadata for query", createStatement.getMaskedStatementText());
        }
        PushQueryMetadata pushQueryMetadata = handleStatement.getPushQueryMetadata().get();
        BlockingQueryPublisher blockingQueryPublisher2 = new BlockingQueryPublisher(context, workerExecutor);
        blockingQueryPublisher2.setQueryHandle(new KsqlQueryHandle(pushQueryMetadata), false, handleStatement.getScalablePushQueryMetadata().isPresent());
        return blockingQueryPublisher2;
    }

    private ConfiguredStatement<Query> createStatement(String str, Map<String, Object> map, Map<String, Object> map2) {
        List parse = this.ksqlEngine.parse(str);
        if (parse.size() != 1) {
            throw new KsqlStatementException(String.format("Expected exactly one KSQL statement; found %d instead", Integer.valueOf(parse.size())), str);
        }
        KsqlParser.PreparedStatement prepare = this.ksqlEngine.prepare((KsqlParser.ParsedStatement) parse.get(0), (Map) map2.entrySet().stream().collect(Collectors.toMap(entry -> {
            return (String) entry.getKey();
        }, entry2 -> {
            return entry2.getValue().toString();
        })));
        if (prepare.getStatement() instanceof Query) {
            return ConfiguredStatement.of(prepare, SessionConfig.of(this.ksqlConfig, map));
        }
        throw new KsqlStatementException("Not a query", str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<String> colTypesFromSchema(List<Column> list) {
        return (List) list.stream().map((v0) -> {
            return v0.type();
        }).map(sqlType -> {
            return sqlType.toString(FormatOptions.none());
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<String> colNamesFromSchema(List<Column> list) {
        return (List) list.stream().map((v0) -> {
            return v0.name();
        }).map((v0) -> {
            return v0.text();
        }).collect(Collectors.toList());
    }
}
