package io.confluent.ksql.engine;

import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.analyzer.Analysis;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.analyzer.QueryAnalyzer;
import io.confluent.ksql.analyzer.RewrittenAnalysis;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.PullQueryExecutionUtil;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.physical.PhysicalPlan;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullPhysicalPlan;
import io.confluent.ksql.physical.pull.PullPhysicalPlanBuilder;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.LogicalPlanner;
import io.confluent.ksql.planner.PullPlannerOptions;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.query.PullQueryQueue;
import io.confluent.ksql.query.QueryExecutor;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.PlanSummary;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/ksql/engine/EngineExecutor.class */
public final class EngineExecutor {
    private final EngineContext engineContext;
    private final ServiceContext serviceContext;
    private final SessionConfig config;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/engine/EngineExecutor$ExecutorPlans.class */
    public static final class ExecutorPlans {
        private final LogicalPlanNode logicalPlan;
        private final PhysicalPlan physicalPlan;

        private ExecutorPlans(LogicalPlanNode logicalPlanNode, PhysicalPlan physicalPlan) {
            this.logicalPlan = (LogicalPlanNode) Objects.requireNonNull(logicalPlanNode, "logicalPlan");
            this.physicalPlan = (PhysicalPlan) Objects.requireNonNull(physicalPlan, "physicalPlanNode");
        }
    }

    private EngineExecutor(EngineContext engineContext, ServiceContext serviceContext, SessionConfig sessionConfig) {
        this.engineContext = (EngineContext) Objects.requireNonNull(engineContext, "engineContext");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.config = (SessionConfig) Objects.requireNonNull(sessionConfig, "config");
        KsqlEngineProps.throwOnImmutableOverride(sessionConfig.getOverrides());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static EngineExecutor create(EngineContext engineContext, ServiceContext serviceContext, SessionConfig sessionConfig) {
        return new EngineExecutor(engineContext, serviceContext, sessionConfig);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsqlExecutionContext.ExecuteResult execute(KsqlPlan ksqlPlan) {
        Optional<QueryPlan> queryPlan = ksqlPlan.getQueryPlan();
        if (!queryPlan.isPresent()) {
            return KsqlExecutionContext.ExecuteResult.of((String) ksqlPlan.getDdlCommand().map(ddlCommand -> {
                return executeDdl(ddlCommand, ksqlPlan.getStatementText(), false, Collections.emptySet());
            }).orElseThrow(() -> {
                return new IllegalStateException("DdlResult should be present if there is no physical plan.");
            }));
        }
        Optional<U> map = ksqlPlan.getDdlCommand().map(ddlCommand2 -> {
            return executeDdl(ddlCommand2, ksqlPlan.getStatementText(), true, ((QueryPlan) queryPlan.get()).getSources());
        });
        return (map.isPresent() && ((String) map.get()).contains("already exists")) ? KsqlExecutionContext.ExecuteResult.of((String) map.get()) : KsqlExecutionContext.ExecuteResult.of(executePersistentQuery(queryPlan.get(), ksqlPlan.getStatementText(), ksqlPlan.getDdlCommand().isPresent()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PullQueryResult executePullQuery(ConfiguredStatement<Query> configuredStatement, HARouting hARouting, RoutingOptions routingOptions, PullPlannerOptions pullPlannerOptions, Optional<PullQueryExecutorMetrics> optional, boolean z) {
        if (!configuredStatement.getStatement().isPullQuery()) {
            throw new IllegalArgumentException("Executor can only handle pull queries");
        }
        SessionConfig sessionConfig = configuredStatement.getSessionConfig();
        try {
            Analysis analyze = new QueryAnalyzer(this.engineContext.getMetaStore(), "").analyze(configuredStatement.getStatement(), Optional.empty());
            PullQueryExecutionUtil.ColumnReferenceRewriter columnReferenceRewriter = new PullQueryExecutionUtil.ColumnReferenceRewriter();
            columnReferenceRewriter.getClass();
            RewrittenAnalysis rewrittenAnalysis = new RewrittenAnalysis(analyze, (v1, v2) -> {
                return r3.process(v1, v2);
            });
            PullPhysicalPlan buildPullPhysicalPlan = buildPullPhysicalPlan(buildAndValidateLogicalPlan(configuredStatement, rewrittenAnalysis, sessionConfig.getConfig(false), pullPlannerOptions), rewrittenAnalysis);
            PullQueryQueue pullQueryQueue = new PullQueryQueue();
            PullQueryResult pullQueryResult = new PullQueryResult(buildPullPhysicalPlan.getOutputSchema(), () -> {
                return hARouting.handlePullQuery(this.serviceContext, buildPullPhysicalPlan, configuredStatement, routingOptions, buildPullPhysicalPlan.getOutputSchema(), buildPullPhysicalPlan.getQueryId(), pullQueryQueue);
            }, buildPullPhysicalPlan.getQueryId(), pullQueryQueue, optional);
            if (z) {
                pullQueryResult.start();
            }
            return pullQueryResult;
        } catch (Exception e) {
            optional.ifPresent(pullQueryExecutorMetrics -> {
                pullQueryExecutorMetrics.recordErrorRate(1.0d);
            });
            throw new KsqlStatementException(e.getMessage() == null ? "Server Error" + Arrays.toString(e.getStackTrace()) : e.getMessage(), configuredStatement.getStatementText(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransientQueryMetadata executeQuery(ConfiguredStatement<Query> configuredStatement, boolean z) {
        ExecutorPlans planQuery = planQuery(configuredStatement, configuredStatement.getStatement(), Optional.empty(), Optional.empty());
        KsqlBareOutputNode ksqlBareOutputNode = (KsqlBareOutputNode) planQuery.logicalPlan.getNode().get();
        QueryExecutor createQueryExecutor = this.engineContext.createQueryExecutor(this.config, this.serviceContext);
        this.engineContext.createQueryValidator().validateQuery(this.config, planQuery.physicalPlan, this.engineContext.getAllLiveQueries());
        return createQueryExecutor.buildTransientQuery(configuredStatement.getStatementText(), planQuery.physicalPlan.getQueryId(), getSourceNames(ksqlBareOutputNode), planQuery.physicalPlan.getPhysicalPlan(), buildPlanSummary(planQuery.physicalPlan.getQueryId(), planQuery.physicalPlan.getPhysicalPlan()), ksqlBareOutputNode.getSchema(), ksqlBareOutputNode.getLimit(), ksqlBareOutputNode.getWindowInfo(), z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsqlPlan plan(ConfiguredStatement<?> configuredStatement) {
        try {
            throwOnNonExecutableStatement(configuredStatement);
            if (configuredStatement.getStatement() instanceof ExecutableDdlStatement) {
                return KsqlPlan.ddlPlanCurrent(configuredStatement.getStatementText(), this.engineContext.createDdlCommand(configuredStatement.getStatementText(), (ExecutableDdlStatement) configuredStatement.getStatement(), this.config));
            }
            QueryContainer queryContainer = (QueryContainer) configuredStatement.getStatement();
            ExecutorPlans planQuery = planQuery(configuredStatement, queryContainer.getQuery(), Optional.of(queryContainer.getSink()), queryContainer.getQueryId());
            KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode = (KsqlStructuredDataOutputNode) planQuery.logicalPlan.getNode().get();
            Optional<DdlCommand> maybeCreateSinkDdl = maybeCreateSinkDdl(configuredStatement, ksqlStructuredDataOutputNode);
            validateQuery(ksqlStructuredDataOutputNode.getNodeOutputType(), configuredStatement);
            QueryPlan queryPlan = new QueryPlan(getSourceNames(ksqlStructuredDataOutputNode), ksqlStructuredDataOutputNode.getIntoSourceName(), planQuery.physicalPlan.getPhysicalPlan(), planQuery.physicalPlan.getQueryId());
            this.engineContext.createQueryValidator().validateQuery(this.config, planQuery.physicalPlan, this.engineContext.getAllLiveQueries());
            return KsqlPlan.queryPlanCurrent(configuredStatement.getStatementText(), maybeCreateSinkDdl, queryPlan);
        } catch (KsqlStatementException e) {
            throw e;
        } catch (Exception e2) {
            throw new KsqlStatementException(e2.getMessage(), configuredStatement.getStatementText(), e2);
        }
    }

    private ExecutorPlans planQuery(ConfiguredStatement<?> configuredStatement, Query query, Optional<Sink> optional, Optional<String> optional2) {
        QueryEngine createQueryEngine = this.engineContext.createQueryEngine(this.serviceContext);
        KsqlConfig config = this.config.getConfig(true);
        OutputNode buildQueryLogicalPlan = QueryEngine.buildQueryLogicalPlan(query, optional, this.engineContext.getMetaStore(), config);
        LogicalPlanNode logicalPlanNode = new LogicalPlanNode(configuredStatement.getStatementText(), Optional.of(buildQueryLogicalPlan));
        QueryId buildId = QueryIdUtil.buildId(this.engineContext, this.engineContext.idGenerator(), buildQueryLogicalPlan, config.getBoolean("ksql.create.or.replace.enabled").booleanValue(), optional2);
        if (optional2.isPresent() && this.engineContext.getPersistentQuery(buildId).isPresent()) {
            throw new KsqlException(String.format("Query ID '%s' already exists.", buildId));
        }
        return new ExecutorPlans(logicalPlanNode, createQueryEngine.buildPhysicalPlan(logicalPlanNode, this.config, this.engineContext.getMetaStore(), buildId));
    }

    private LogicalPlanNode buildAndValidateLogicalPlan(ConfiguredStatement<?> configuredStatement, ImmutableAnalysis immutableAnalysis, KsqlConfig ksqlConfig, PullPlannerOptions pullPlannerOptions) {
        return new LogicalPlanNode(configuredStatement.getStatementText(), Optional.of(new LogicalPlanner(ksqlConfig, immutableAnalysis, this.engineContext.getMetaStore()).buildPullLogicalPlan(pullPlannerOptions)));
    }

    private PullPhysicalPlan buildPullPhysicalPlan(LogicalPlanNode logicalPlanNode, ImmutableAnalysis immutableAnalysis) {
        return new PullPhysicalPlanBuilder(this.engineContext.getProcessingLogContext(), PullQueryExecutionUtil.findMaterializingQuery(this.engineContext, immutableAnalysis), immutableAnalysis).buildPullPhysicalPlan(logicalPlanNode);
    }

    private Optional<DdlCommand> maybeCreateSinkDdl(ConfiguredStatement<?> configuredStatement, KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode) {
        if (!ksqlStructuredDataOutputNode.createInto()) {
            validateExistingSink(ksqlStructuredDataOutputNode);
            return Optional.empty();
        }
        Object statement = configuredStatement.getStatement();
        SourceName intoSourceName = ksqlStructuredDataOutputNode.getIntoSourceName();
        boolean z = (statement instanceof CreateAsSelect) && ((CreateAsSelect) statement).isOrReplace();
        boolean z2 = (statement instanceof CreateAsSelect) && ((CreateAsSelect) statement).isNotExists();
        DataSource source = this.engineContext.getMetaStore().getSource(intoSourceName);
        if (source == null || z2 || z) {
            return Optional.of(this.engineContext.createDdlCommand(ksqlStructuredDataOutputNode));
        }
        throw new KsqlException(String.format("Cannot add %s '%s': A %s with the same name already exists", ksqlStructuredDataOutputNode.getNodeOutputType().getKsqlType().toLowerCase(), intoSourceName.text(), source.getDataSourceType().getKsqlType().toLowerCase()));
    }

    private void validateExistingSink(KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode) {
        SourceName intoSourceName = ksqlStructuredDataOutputNode.getIntoSourceName();
        DataSource source = this.engineContext.getMetaStore().getSource(intoSourceName);
        if (source == null) {
            throw new KsqlException(String.format("%s does not exist.", ksqlStructuredDataOutputNode));
        }
        if (source.getDataSourceType() != ksqlStructuredDataOutputNode.getNodeOutputType()) {
            throw new KsqlException(String.format("Incompatible data sink and query result. Data sink (%s) type is %s but select query result is %s.", intoSourceName.text(), source.getDataSourceType(), ksqlStructuredDataOutputNode.getNodeOutputType()));
        }
        LogicalSchema schema = ksqlStructuredDataOutputNode.getSchema();
        LogicalSchema schema2 = source.getSchema();
        if (!schema.compatibleSchema(schema2)) {
            throw new KsqlException("Incompatible schema between results and sink." + System.lineSeparator() + "Result schema is " + schema + System.lineSeparator() + "Sink schema is " + schema2);
        }
    }

    private static void validateQuery(DataSource.DataSourceType dataSourceType, ConfiguredStatement<?> configuredStatement) {
        if ((configuredStatement.getStatement() instanceof CreateStreamAsSelect) && dataSourceType == DataSource.DataSourceType.KTABLE) {
            throw new KsqlStatementException("Invalid result type. Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.", configuredStatement.getStatementText());
        }
        if ((configuredStatement.getStatement() instanceof CreateTableAsSelect) && dataSourceType == DataSource.DataSourceType.KSTREAM) {
            throw new KsqlStatementException("Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.", configuredStatement.getStatementText());
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [io.confluent.ksql.parser.tree.Statement] */
    private static void throwOnNonExecutableStatement(ConfiguredStatement<?> configuredStatement) {
        if (!KsqlEngine.isExecutableStatement(configuredStatement.getStatement())) {
            throw new KsqlStatementException("Statement not executable", configuredStatement.getStatementText());
        }
    }

    private static Set<SourceName> getSourceNames(PlanNode planNode) {
        return (Set) planNode.getSourceNodes().map((v0) -> {
            return v0.getDataSource();
        }).map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
    }

    private String executeDdl(DdlCommand ddlCommand, String str, boolean z, Set<SourceName> set) {
        try {
            return this.engineContext.executeDdl(str, ddlCommand, z, set);
        } catch (KsqlStatementException e) {
            throw e;
        } catch (Exception e2) {
            throw new KsqlStatementException(e2.getMessage(), str, e2);
        }
    }

    private PersistentQueryMetadata executePersistentQuery(QueryPlan queryPlan, String str, boolean z) {
        PersistentQueryMetadata buildPersistentQuery = this.engineContext.createQueryExecutor(this.config, this.serviceContext).buildPersistentQuery(str, queryPlan.getQueryId(), this.engineContext.getMetaStore().getSource(queryPlan.getSink()), queryPlan.getSources(), queryPlan.getPhysicalPlan(), buildPlanSummary(queryPlan.getQueryId(), queryPlan.getPhysicalPlan()));
        this.engineContext.registerQuery(buildPersistentQuery, z);
        return buildPersistentQuery;
    }

    private String buildPlanSummary(QueryId queryId, ExecutionStep<?> executionStep) {
        return new PlanSummary(queryId, this.config.getConfig(true), this.engineContext.getMetaStore()).summarize(executionStep);
    }
}
