package io.confluent.ksql.engine;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.physical.PhysicalPlan;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.PlanSourceExtractorVisitor;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.AvroUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.PlanSummary;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/ksql/engine/EngineExecutor.class */
public final class EngineExecutor {
    private final EngineContext engineContext;
    private final ServiceContext serviceContext;
    private final KsqlConfig ksqlConfig;
    private final Map<String, Object> overriddenProperties;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/engine/EngineExecutor$ExecutorPlans.class */
    public static final class ExecutorPlans {
        private final LogicalPlanNode logicalPlan;
        private final PhysicalPlan physicalPlan;

        private ExecutorPlans(LogicalPlanNode logicalPlanNode, PhysicalPlan physicalPlan) {
            this.logicalPlan = (LogicalPlanNode) Objects.requireNonNull(logicalPlanNode, "logicalPlan");
            this.physicalPlan = (PhysicalPlan) Objects.requireNonNull(physicalPlan, "physicalPlanNode");
        }
    }

    private EngineExecutor(EngineContext engineContext, ServiceContext serviceContext, KsqlConfig ksqlConfig, Map<String, Object> map) {
        this.engineContext = (EngineContext) Objects.requireNonNull(engineContext, "engineContext");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.overriddenProperties = (Map) Objects.requireNonNull(map, "overriddenProperties");
        KsqlEngineProps.throwOnImmutableOverride(map);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static EngineExecutor create(EngineContext engineContext, ServiceContext serviceContext, KsqlConfig ksqlConfig, Map<String, Object> map) {
        return new EngineExecutor(engineContext, serviceContext, ksqlConfig, map);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsqlExecutionContext.ExecuteResult execute(KsqlPlan ksqlPlan) {
        Optional<U> map = ksqlPlan.getDdlCommand().map(ddlCommand -> {
            return executeDdl(ddlCommand, ksqlPlan.getStatementText(), ksqlPlan.getQueryPlan().isPresent());
        });
        return (KsqlExecutionContext.ExecuteResult) ksqlPlan.getQueryPlan().map(queryPlan -> {
            return executePersistentQuery(queryPlan, ksqlPlan.getStatementText());
        }).map((v0) -> {
            return KsqlExecutionContext.ExecuteResult.of(v0);
        }).orElseGet(() -> {
            return KsqlExecutionContext.ExecuteResult.of((String) map.get());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransientQueryMetadata executeQuery(ConfiguredStatement<Query> configuredStatement) {
        ExecutorPlans planQuery = planQuery(configuredStatement, configuredStatement.getStatement(), Optional.empty());
        OutputNode outputNode = planQuery.logicalPlan.getNode().get();
        return this.engineContext.createQueryExecutor(this.ksqlConfig, this.overriddenProperties, this.serviceContext).buildTransientQuery(configuredStatement.getStatementText(), planQuery.physicalPlan.getQueryId(), getSourceNames(outputNode), planQuery.physicalPlan.getPhysicalPlan(), buildPlanSummary(planQuery.physicalPlan.getQueryId(), planQuery.physicalPlan.getPhysicalPlan()), outputNode.getSchema(), outputNode.getLimit());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryMetadata executeQuery(ConfiguredStatement<Query> configuredStatement, Consumer<GenericRow> consumer) {
        ExecutorPlans planQuery = planQuery(configuredStatement, configuredStatement.getStatement(), Optional.empty());
        OutputNode outputNode = planQuery.logicalPlan.getNode().get();
        return this.engineContext.createQueryExecutor(this.ksqlConfig, this.overriddenProperties, this.serviceContext).buildTransientQuery(configuredStatement.getStatementText(), planQuery.physicalPlan.getQueryId(), getSourceNames(outputNode), planQuery.physicalPlan.getPhysicalPlan(), buildPlanSummary(planQuery.physicalPlan.getQueryId(), planQuery.physicalPlan.getPhysicalPlan()), outputNode.getSchema(), consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsqlPlan plan(ConfiguredStatement<?> configuredStatement) {
        try {
            throwOnNonExecutableStatement(configuredStatement);
            if (configuredStatement.getStatement() instanceof ExecutableDdlStatement) {
                return KsqlPlan.ddlPlanCurrent(configuredStatement.getStatementText(), this.engineContext.createDdlCommand(configuredStatement.getStatementText(), (ExecutableDdlStatement) configuredStatement.getStatement(), this.ksqlConfig, this.overriddenProperties));
            }
            QueryContainer queryContainer = (QueryContainer) configuredStatement.getStatement();
            ExecutorPlans planQuery = planQuery(configuredStatement, queryContainer.getQuery(), Optional.of(queryContainer.getSink()));
            KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode = (KsqlStructuredDataOutputNode) planQuery.logicalPlan.getNode().get();
            Optional<DdlCommand> maybeCreateSinkDdl = maybeCreateSinkDdl(configuredStatement.getStatementText(), ksqlStructuredDataOutputNode, planQuery.physicalPlan.getKeyField().get());
            validateQuery(ksqlStructuredDataOutputNode.getNodeOutputType(), configuredStatement);
            return KsqlPlan.queryPlanCurrent(configuredStatement.getStatementText(), maybeCreateSinkDdl, new QueryPlan(getSourceNames(ksqlStructuredDataOutputNode), ksqlStructuredDataOutputNode.getIntoSourceName(), planQuery.physicalPlan.getPhysicalPlan(), planQuery.physicalPlan.getQueryId()));
        } catch (Exception e) {
            throw new KsqlStatementException(e.getMessage(), configuredStatement.getStatementText(), e);
        } catch (KsqlStatementException e2) {
            throw e2;
        }
    }

    private ExecutorPlans planQuery(ConfiguredStatement<?> configuredStatement, Query query, Optional<Sink> optional) {
        QueryEngine createQueryEngine = this.engineContext.createQueryEngine(this.serviceContext);
        LogicalPlanNode logicalPlanNode = new LogicalPlanNode(configuredStatement.getStatementText(), Optional.of(QueryEngine.buildQueryLogicalPlan(query, optional, this.engineContext.getMetaStore(), this.ksqlConfig.cloneWithPropertyOverwrite(this.overriddenProperties))));
        return new ExecutorPlans(logicalPlanNode, createQueryEngine.buildPhysicalPlan(logicalPlanNode, this.ksqlConfig, this.overriddenProperties, this.engineContext.getMetaStore()));
    }

    private Optional<DdlCommand> maybeCreateSinkDdl(String str, KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode, KeyField keyField) {
        if (!ksqlStructuredDataOutputNode.isDoCreateInto()) {
            validateExistingSink(ksqlStructuredDataOutputNode, keyField);
            return Optional.empty();
        }
        Formats of = Formats.of(ksqlStructuredDataOutputNode.getKsqlTopic().getKeyFormat(), ksqlStructuredDataOutputNode.getKsqlTopic().getValueFormat(), ksqlStructuredDataOutputNode.getSerdeOptions());
        CreateStreamCommand createStreamCommand = ksqlStructuredDataOutputNode.getNodeOutputType() == DataSource.DataSourceType.KSTREAM ? new CreateStreamCommand(ksqlStructuredDataOutputNode.getIntoSourceName(), ksqlStructuredDataOutputNode.getSchema(), keyField.ref(), ksqlStructuredDataOutputNode.getTimestampColumn(), ksqlStructuredDataOutputNode.getKsqlTopic().getKafkaTopicName(), of, ksqlStructuredDataOutputNode.getKsqlTopic().getKeyFormat().getWindowInfo()) : new CreateTableCommand(ksqlStructuredDataOutputNode.getIntoSourceName(), ksqlStructuredDataOutputNode.getSchema(), keyField.ref(), ksqlStructuredDataOutputNode.getTimestampColumn(), ksqlStructuredDataOutputNode.getKsqlTopic().getKafkaTopicName(), of, ksqlStructuredDataOutputNode.getKsqlTopic().getKeyFormat().getWindowInfo());
        AvroUtil.throwOnInvalidSchemaEvolution(str, createStreamCommand, this.serviceContext.getSchemaRegistryClient(), this.ksqlConfig);
        return Optional.of(createStreamCommand);
    }

    private void validateExistingSink(KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode, KeyField keyField) {
        SourceName intoSourceName = ksqlStructuredDataOutputNode.getIntoSourceName();
        DataSource source = this.engineContext.getMetaStore().getSource(intoSourceName);
        if (source == null) {
            throw new KsqlException(String.format("%s does not exist.", ksqlStructuredDataOutputNode));
        }
        if (source.getDataSourceType() != ksqlStructuredDataOutputNode.getNodeOutputType()) {
            throw new KsqlException(String.format("Incompatible data sink and query result. Data sink (%s) type is %s but select query result is %s.", intoSourceName.name(), source.getDataSourceType(), ksqlStructuredDataOutputNode.getNodeOutputType()));
        }
        LogicalSchema schema = ksqlStructuredDataOutputNode.getSchema();
        LogicalSchema schema2 = source.getSchema();
        if (!schema.equals(schema2)) {
            throw new KsqlException("Incompatible schema between results and sink. Result schema is " + schema + ", but the sink schema is " + schema2 + ".");
        }
        enforceKeyEquivalence(source.getKeyField().resolve(schema2), keyField.resolve(schema));
    }

    private static void enforceKeyEquivalence(Optional<Column> optional, Optional<Column> optional2) {
        if (optional.isPresent() || optional2.isPresent()) {
            if (optional.isPresent() && optional2.isPresent() && optional.get().name().equals(optional2.get().name()) && Objects.equals(optional.get().type(), optional2.get().type())) {
                return;
            }
            throwIncompatibleKeysException(optional, optional2);
        }
    }

    private static void throwIncompatibleKeysException(Optional<Column> optional, Optional<Column> optional2) {
        throw new KsqlException(String.format("Incompatible key fields for sink and results. Sink key field is %s (type: %s) while result key field is %s (type: %s)", optional.map(column -> {
            return column.name().name();
        }).orElse(null), optional.map((v0) -> {
            return v0.type();
        }).orElse(null), optional2.map(column2 -> {
            return column2.name().name();
        }).orElse(null), optional2.map((v0) -> {
            return v0.type();
        }).orElse(null)));
    }

    private static void validateQuery(DataSource.DataSourceType dataSourceType, ConfiguredStatement<?> configuredStatement) {
        if ((configuredStatement.getStatement() instanceof CreateStreamAsSelect) && dataSourceType == DataSource.DataSourceType.KTABLE) {
            throw new KsqlStatementException("Invalid result type. Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.", configuredStatement.getStatementText());
        }
        if ((configuredStatement.getStatement() instanceof CreateTableAsSelect) && dataSourceType == DataSource.DataSourceType.KSTREAM) {
            throw new KsqlStatementException("Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.", configuredStatement.getStatementText());
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [io.confluent.ksql.parser.tree.Statement] */
    private static void throwOnNonExecutableStatement(ConfiguredStatement<?> configuredStatement) {
        if (!KsqlEngine.isExecutableStatement(configuredStatement.getStatement())) {
            throw new KsqlStatementException("Statement not executable", configuredStatement.getStatementText());
        }
    }

    private static Set<SourceName> getSourceNames(PlanNode planNode) {
        PlanSourceExtractorVisitor planSourceExtractorVisitor = new PlanSourceExtractorVisitor();
        planSourceExtractorVisitor.process(planNode, null);
        return planSourceExtractorVisitor.getSourceNames();
    }

    private String executeDdl(DdlCommand ddlCommand, String str, boolean z) {
        try {
            return this.engineContext.executeDdl(str, ddlCommand, z);
        } catch (Exception e) {
            throw new KsqlStatementException(e.getMessage(), str, e);
        } catch (KsqlStatementException e2) {
            throw e2;
        }
    }

    private PersistentQueryMetadata executePersistentQuery(QueryPlan queryPlan, String str) {
        PersistentQueryMetadata buildQuery = this.engineContext.createQueryExecutor(this.ksqlConfig, this.overriddenProperties, this.serviceContext).buildQuery(str, queryPlan.getQueryId(), this.engineContext.getMetaStore().getSource(queryPlan.getSink()), queryPlan.getSources(), queryPlan.getPhysicalPlan(), buildPlanSummary(queryPlan.getQueryId(), queryPlan.getPhysicalPlan()));
        this.engineContext.registerQuery(buildQuery);
        return buildQuery;
    }

    private String buildPlanSummary(QueryId queryId, ExecutionStep<?> executionStep) {
        return new PlanSummary(queryId, this.ksqlConfig, this.engineContext.getMetaStore()).summarize(executionStep);
    }
}
