package io.confluent.ksql.api.impl;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.api.server.PushQueryHandle;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.rest.entity.TableRows;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.execution.PullQueryResult;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.JsonObject;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:io/confluent/ksql/api/impl/QueryEndpoint.class */
public class QueryEndpoint {
    private final KsqlEngine ksqlEngine;
    private final KsqlConfig ksqlConfig;
    private final PullQueryExecutor pullQueryExecutor;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/api/impl/QueryEndpoint$KsqlQueryHandle.class */
    public static class KsqlQueryHandle implements PushQueryHandle {
        private final TransientQueryMetadata queryMetadata;

        KsqlQueryHandle(TransientQueryMetadata transientQueryMetadata) {
            this.queryMetadata = (TransientQueryMetadata) Objects.requireNonNull(transientQueryMetadata, "queryMetadata");
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandle
        public List<String> getColumnNames() {
            return QueryEndpoint.colNamesFromSchema(this.queryMetadata.getLogicalSchema().value());
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandle
        public List<String> getColumnTypes() {
            return QueryEndpoint.colTypesFromSchema(this.queryMetadata.getLogicalSchema().value());
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandle
        public void start() {
            this.queryMetadata.start();
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandle
        public void stop() {
            this.queryMetadata.close();
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandle
        public BlockingRowQueue getQueue() {
            return this.queryMetadata.getRowQueue();
        }
    }

    public QueryEndpoint(KsqlEngine ksqlEngine, KsqlConfig ksqlConfig, PullQueryExecutor pullQueryExecutor, Optional<PullQueryExecutorMetrics> optional) {
        this.ksqlEngine = ksqlEngine;
        this.ksqlConfig = ksqlConfig;
        this.pullQueryExecutor = pullQueryExecutor;
        this.pullQueryMetrics = optional;
    }

    public QueryPublisher createQueryPublisher(String str, JsonObject jsonObject, Context context, WorkerExecutor workerExecutor, ServiceContext serviceContext) {
        long nanoseconds = Time.SYSTEM.nanoseconds();
        VertxUtils.checkIsWorker();
        ConfiguredStatement<Query> createStatement = createStatement(str, jsonObject.getMap());
        return createStatement.getStatement().isPullQuery() ? createPullQueryPublisher(context, serviceContext, createStatement, this.pullQueryMetrics, nanoseconds) : createPushQueryPublisher(context, serviceContext, createStatement, workerExecutor);
    }

    private QueryPublisher createPushQueryPublisher(Context context, ServiceContext serviceContext, ConfiguredStatement<Query> configuredStatement, WorkerExecutor workerExecutor) {
        BlockingQueryPublisher blockingQueryPublisher = new BlockingQueryPublisher(context, workerExecutor);
        blockingQueryPublisher.setQueryHandle(new KsqlQueryHandle(this.ksqlEngine.executeQuery(serviceContext, configuredStatement)));
        return blockingQueryPublisher;
    }

    private QueryPublisher createPullQueryPublisher(Context context, ServiceContext serviceContext, ConfiguredStatement<Query> configuredStatement, Optional<PullQueryExecutorMetrics> optional, long j) {
        PullQueryResult execute = this.pullQueryExecutor.execute(configuredStatement, ImmutableMap.of(), serviceContext, Optional.of(false), optional);
        optional.ifPresent(pullQueryExecutorMetrics -> {
            pullQueryExecutorMetrics.recordLatency(j);
        });
        TableRows tableRows = execute.getTableRows();
        return new PullQueryPublisher(context, tableRows, colNamesFromSchema(tableRows.getSchema().columns()), colTypesFromSchema(tableRows.getSchema().columns()));
    }

    private ConfiguredStatement<Query> createStatement(String str, Map<String, Object> map) {
        List parse = this.ksqlEngine.parse(str);
        if (parse.size() != 1) {
            throw new KsqlStatementException(String.format("Expected exactly one KSQL statement; found %d instead", Integer.valueOf(parse.size())), str);
        }
        KsqlParser.PreparedStatement prepare = this.ksqlEngine.prepare((KsqlParser.ParsedStatement) parse.get(0));
        if (prepare.getStatement() instanceof Query) {
            return ConfiguredStatement.of(prepare, SessionConfig.of(this.ksqlConfig, map));
        }
        throw new KsqlStatementException("Not a query", str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<String> colTypesFromSchema(List<Column> list) {
        return (List) list.stream().map((v0) -> {
            return v0.type();
        }).map(sqlType -> {
            return sqlType.toString(FormatOptions.none());
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<String> colNamesFromSchema(List<Column> list) {
        return (List) list.stream().map((v0) -> {
            return v0.name();
        }).map((v0) -> {
            return v0.text();
        }).collect(Collectors.toList());
    }
}
