package io.confluent.ksql.rest.server.execution;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.analyzer.Analysis;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.analyzer.PullQueryValidator;
import io.confluent.ksql.analyzer.QueryAnalyzer;
import io.confluent.ksql.analyzer.RewrittenAnalysis;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.IntegerLiteral;
import io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression;
import io.confluent.ksql.execution.expression.tree.LongLiteral;
import io.confluent.ksql.execution.expression.tree.QualifiedColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.StringLiteral;
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.execution.streams.materialization.Locator;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.execution.streams.materialization.PullProcessingContext;
import io.confluent.ksql.execution.streams.materialization.TableRow;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.transform.select.SelectValueMapperFactory;
import io.confluent.ksql.execution.util.ExpressionTypeManager;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.AllColumns;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Select;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.entity.TableRowsEntity;
import io.confluent.ksql.rest.entity.TableRowsEntityFactory;
import io.confluent.ksql.rest.server.HeartbeatAgent;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.DefaultSqlValueCoercer;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/execution/PullQueryExecutor.class */
public final class PullQueryExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(PullQueryExecutor.class);
    private static final Set<ComparisonExpression.Type> VALID_WINDOW_BOUNDS_TYPES = ImmutableSet.of(ComparisonExpression.Type.EQUAL, ComparisonExpression.Type.GREATER_THAN, ComparisonExpression.Type.GREATER_THAN_OR_EQUAL, ComparisonExpression.Type.LESS_THAN, ComparisonExpression.Type.LESS_THAN_OR_EQUAL);
    private static final String VALID_WINDOW_BOUNDS_TYPES_STRING = VALID_WINDOW_BOUNDS_TYPES.toString();
    private final KsqlExecutionContext executionContext;
    private final Optional<HeartbeatAgent> heartbeatAgent;
    private final RoutingFilter.RoutingFilterFactory routingFilterFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.ksql.rest.server.execution.PullQueryExecutor$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/ksql/rest/server/execution/PullQueryExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$ksql$execution$expression$tree$ComparisonExpression$Type = new int[ComparisonExpression.Type.values().length];

        static {
            try {
                $SwitchMap$io$confluent$ksql$execution$expression$tree$ComparisonExpression$Type[ComparisonExpression.Type.LESS_THAN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$ksql$execution$expression$tree$ComparisonExpression$Type[ComparisonExpression.Type.LESS_THAN_OR_EQUAL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$confluent$ksql$execution$expression$tree$ComparisonExpression$Type[ComparisonExpression.Type.GREATER_THAN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$confluent$ksql$execution$expression$tree$ComparisonExpression$Type[ComparisonExpression.Type.GREATER_THAN_OR_EQUAL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/execution/PullQueryExecutor$ColumnReferenceRewriter.class */
    private static final class ColumnReferenceRewriter extends VisitParentExpressionVisitor<Optional<Expression>, ExpressionTreeRewriter.Context<Void>> {
        private ColumnReferenceRewriter() {
            super(Optional.empty());
        }

        public Optional<Expression> visitQualifiedColumnReference(QualifiedColumnReferenceExp qualifiedColumnReferenceExp, ExpressionTreeRewriter.Context<Void> context) {
            return Optional.of(new UnqualifiedColumnReferenceExp(qualifiedColumnReferenceExp.getReference()));
        }

        /* synthetic */ ColumnReferenceRewriter(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/execution/PullQueryExecutor$ComparisonTarget.class */
    public enum ComparisonTarget {
        ROWKEY,
        WINDOWSTART
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/execution/PullQueryExecutor$ConfigRoutingOptions.class */
    public static final class ConfigRoutingOptions implements RoutingOptions {
        private final KsqlConfig ksqlConfig;
        private final Map<String, ?> overrides;

        ConfigRoutingOptions(KsqlConfig ksqlConfig, Map<String, ?> map) {
            this.ksqlConfig = ksqlConfig;
            this.overrides = map;
        }

        private long getLong(String str) {
            return this.overrides.containsKey(str) ? ((Long) this.overrides.get(str)).longValue() : this.ksqlConfig.getLong(str).longValue();
        }

        public long getOffsetLagAllowed() {
            return getLong("ksql.query.pull.max.allowed.offset.lag");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/execution/PullQueryExecutor$PullQueryContext.class */
    public static final class PullQueryContext {
        private final Struct rowKey;
        private final Materialization mat;
        private final ImmutableAnalysis analysis;
        private final WhereInfo whereInfo;
        private final QueryId queryId;
        private final QueryContext.Stacker contextStacker;

        private PullQueryContext(Struct struct, Materialization materialization, ImmutableAnalysis immutableAnalysis, WhereInfo whereInfo, QueryId queryId, QueryContext.Stacker stacker) {
            this.rowKey = (Struct) Objects.requireNonNull(struct, "rowkey");
            this.mat = (Materialization) Objects.requireNonNull(materialization, "materialization");
            this.analysis = (ImmutableAnalysis) Objects.requireNonNull(immutableAnalysis, "analysis");
            this.whereInfo = (WhereInfo) Objects.requireNonNull(whereInfo, "whereInfo");
            this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId");
            this.contextStacker = (QueryContext.Stacker) Objects.requireNonNull(stacker, "contextStacker");
        }

        public Struct getRowKey() {
            return this.rowKey;
        }

        public Materialization getMat() {
            return this.mat;
        }

        public ImmutableAnalysis getAnalysis() {
            return this.analysis;
        }

        public WhereInfo getWhereInfo() {
            return this.whereInfo;
        }

        public QueryId getQueryId() {
            return this.queryId;
        }

        public QueryContext.Stacker getContextStacker() {
            return this.contextStacker;
        }

        /* synthetic */ PullQueryContext(Struct struct, Materialization materialization, ImmutableAnalysis immutableAnalysis, WhereInfo whereInfo, QueryId queryId, QueryContext.Stacker stacker, AnonymousClass1 anonymousClass1) {
            this(struct, materialization, immutableAnalysis, whereInfo, queryId, stacker);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/execution/PullQueryExecutor$Result.class */
    public static final class Result {
        private final LogicalSchema schema;
        private final List<? extends TableRow> rows;

        private Result(LogicalSchema logicalSchema, List<? extends TableRow> list) {
            this.schema = (LogicalSchema) Objects.requireNonNull(logicalSchema, "schema");
            this.rows = (List) Objects.requireNonNull(list, "rows");
        }

        /* synthetic */ Result(LogicalSchema logicalSchema, List list, AnonymousClass1 anonymousClass1) {
            this(logicalSchema, list);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/execution/PullQueryExecutor$WhereInfo.class */
    public static final class WhereInfo {
        private final Object rowkey;
        private final Optional<Range<Instant>> windowStartBounds;

        private WhereInfo(Object obj, Optional<Range<Instant>> optional) {
            this.rowkey = obj;
            this.windowStartBounds = optional;
        }

        /* synthetic */ WhereInfo(Object obj, Optional optional, AnonymousClass1 anonymousClass1) {
            this(obj, optional);
        }
    }

    public PullQueryExecutor(KsqlExecutionContext ksqlExecutionContext, Optional<HeartbeatAgent> optional, RoutingFilter.RoutingFilterFactory routingFilterFactory) {
        this.executionContext = (KsqlExecutionContext) Objects.requireNonNull(ksqlExecutionContext, "executionContext");
        this.heartbeatAgent = (Optional) Objects.requireNonNull(optional, "heartbeatAgent");
        this.routingFilterFactory = (RoutingFilter.RoutingFilterFactory) Objects.requireNonNull(routingFilterFactory, "routingFilterFactory");
    }

    public static void validate(ConfiguredStatement<Query> configuredStatement, Map<String, ?> map, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        throw new KsqlRestException(Errors.queryEndpoint(configuredStatement.getStatementText()));
    }

    public TableRowsEntity execute(ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext) {
        if (!configuredStatement.getStatement().isPullQuery()) {
            throw new IllegalArgumentException("Executor can only handle pull queries");
        }
        if (!configuredStatement.getConfig().getBoolean("ksql.pull.queries.enable").booleanValue()) {
            throw new KsqlException("Pull queries are disabled. Refer to https://cnfl.io/queries for info on query types. If you intended to issue a push query, resubmit with the EMIT CHANGES clause" + System.lineSeparator() + "Please set ksql.pull.queries.enable=true to enable this feature.");
        }
        try {
            ImmutableAnalysis analyze = analyze(configuredStatement, this.executionContext);
            ColumnReferenceRewriter columnReferenceRewriter = new ColumnReferenceRewriter(null);
            columnReferenceRewriter.getClass();
            RewrittenAnalysis rewrittenAnalysis = new RewrittenAnalysis(analyze, (v1, v2) -> {
                return r3.process(v1, v2);
            });
            PersistentQueryMetadata findMaterializingQuery = findMaterializingQuery(this.executionContext, rewrittenAnalysis);
            WhereInfo extractWhereInfo = extractWhereInfo(rewrittenAnalysis, findMaterializingQuery);
            QueryId uniqueQueryId = uniqueQueryId();
            QueryContext.Stacker stacker = new QueryContext.Stacker();
            return handlePullQuery(configuredStatement, this.executionContext, serviceContext, new PullQueryContext(asKeyStruct(extractWhereInfo.rowkey, findMaterializingQuery.getPhysicalSchema()), (Materialization) findMaterializingQuery.getMaterialization(uniqueQueryId, stacker).orElseThrow(() -> {
                return notMaterializedException(getSourceName(rewrittenAnalysis));
            }), rewrittenAnalysis, extractWhereInfo, uniqueQueryId, stacker, null));
        } catch (Exception e) {
            throw new KsqlStatementException(e.getMessage() == null ? "Server Error" : e.getMessage(), configuredStatement.getStatementText(), e);
        }
    }

    private TableRowsEntity handlePullQuery(ConfiguredStatement<Query> configuredStatement, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext, PullQueryContext pullQueryContext) {
        List<Locator.KsqlNode> locate = pullQueryContext.mat.locator().locate(pullQueryContext.rowKey, new ConfigRoutingOptions(configuredStatement.getConfig(), configuredStatement.getOverrides()), this.routingFilterFactory);
        if (locate.isEmpty()) {
            throw new MaterializationException("All nodes are dead or exceed max allowed lag.");
        }
        for (Locator.KsqlNode ksqlNode : locate) {
            try {
                return routeQuery(ksqlNode, configuredStatement, ksqlExecutionContext, serviceContext, pullQueryContext);
            } catch (Exception e) {
                LOG.debug("Error routing query {} to host {} at timestamp {}", new Object[]{configuredStatement.getStatementText(), ksqlNode, Long.valueOf(System.currentTimeMillis())});
            }
        }
        throw new MaterializationException(String.format("Unable to execute pull query: %s", configuredStatement.getStatementText()));
    }

    private TableRowsEntity routeQuery(Locator.KsqlNode ksqlNode, ConfiguredStatement<Query> configuredStatement, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext, PullQueryContext pullQueryContext) {
        if (ksqlNode.isLocal()) {
            LOG.debug("Query {} executed locally at host {} at timestamp {}.", new Object[]{configuredStatement.getStatementText(), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
            return queryRowsLocally(configuredStatement, ksqlExecutionContext, pullQueryContext);
        }
        LOG.debug("Query {} routed to host {} at timestamp {}.", new Object[]{configuredStatement.getStatementText(), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
        return forwardTo(ksqlNode, configuredStatement, serviceContext);
    }

    @VisibleForTesting
    TableRowsEntity queryRowsLocally(ConfiguredStatement<Query> configuredStatement, KsqlExecutionContext ksqlExecutionContext, PullQueryContext pullQueryContext) {
        Result result;
        LogicalSchema selectOutputSchema;
        List<List<?>> handleSelects;
        if (pullQueryContext.whereInfo.windowStartBounds.isPresent()) {
            result = new Result(pullQueryContext.mat.schema(), pullQueryContext.mat.windowed().get(pullQueryContext.rowKey, (Range) pullQueryContext.whereInfo.windowStartBounds.get()), null);
        } else {
            result = new Result(pullQueryContext.mat.schema(), (List) pullQueryContext.mat.nonWindowed().get(pullQueryContext.rowKey).map((v0) -> {
                return ImmutableList.of(v0);
            }).orElse(ImmutableList.of()), null);
        }
        if (isSelectStar(configuredStatement.getStatement().getSelect())) {
            selectOutputSchema = TableRowsEntityFactory.buildSchema(result.schema, pullQueryContext.mat.windowType().isPresent());
            handleSelects = TableRowsEntityFactory.createRows(result.rows);
        } else {
            selectOutputSchema = selectOutputSchema(result, ksqlExecutionContext, pullQueryContext.analysis, pullQueryContext.mat.windowType());
            handleSelects = handleSelects(result, configuredStatement, ksqlExecutionContext, pullQueryContext.analysis, selectOutputSchema, pullQueryContext.mat.windowType(), pullQueryContext.queryId, pullQueryContext.contextStacker);
        }
        return new TableRowsEntity(configuredStatement.getStatementText(), pullQueryContext.queryId, selectOutputSchema, handleSelects);
    }

    private QueryId uniqueQueryId() {
        return new QueryId("query_" + System.currentTimeMillis());
    }

    private ImmutableAnalysis analyze(ConfiguredStatement<Query> configuredStatement, KsqlExecutionContext ksqlExecutionContext) {
        return new QueryAnalyzer(ksqlExecutionContext.getMetaStore(), "", SerdeOption.none()).analyze(configuredStatement.getStatement(), Optional.empty());
    }

    private WhereInfo extractWhereInfo(ImmutableAnalysis immutableAnalysis, PersistentQueryMetadata persistentQueryMetadata) {
        boolean isWindowed = persistentQueryMetadata.getResultTopic().getKeyFormat().isWindowed();
        Map<ComparisonTarget, List<ComparisonExpression>> extractComparisons = extractComparisons((Expression) immutableAnalysis.getWhereExpression().orElseThrow(() -> {
            return invalidWhereClauseException("Missing WHERE clause", isWindowed);
        }));
        List<ComparisonExpression> list = extractComparisons.get(ComparisonTarget.ROWKEY);
        if (list == null) {
            throw invalidWhereClauseException("WHERE clause missing ROWKEY", isWindowed);
        }
        Object extractRowKeyWhereClause = extractRowKeyWhereClause(list, isWindowed, persistentQueryMetadata.getLogicalSchema());
        if (isWindowed) {
            return new WhereInfo(extractRowKeyWhereClause, Optional.of(extractWhereClauseWindowBounds(Optional.ofNullable(extractComparisons.get(ComparisonTarget.WINDOWSTART)))), null);
        }
        if (extractComparisons.size() > 1) {
            throw invalidWhereClauseException("Unsupported WHERE clause", false);
        }
        return new WhereInfo(extractRowKeyWhereClause, Optional.empty(), null);
    }

    private Object extractRowKeyWhereClause(List<ComparisonExpression> list, boolean z, LogicalSchema logicalSchema) {
        if (list.size() != 1) {
            throw invalidWhereClauseException("Multiple bounds on ROWKEY", z);
        }
        ComparisonExpression comparisonExpression = list.get(0);
        if (comparisonExpression.getType() != ComparisonExpression.Type.EQUAL) {
            throw invalidWhereClauseException("ROWKEY bound must currently be '='", z);
        }
        return coerceRowKey(logicalSchema, getNonColumnRefSide(comparisonExpression).getValue(), z);
    }

    private Object coerceRowKey(LogicalSchema logicalSchema, Object obj, boolean z) {
        if (logicalSchema.key().size() != 1) {
            throw invalidWhereClauseException("Only single KEY column supported", z);
        }
        SqlType type = ((Column) logicalSchema.key().get(0)).type();
        return DefaultSqlValueCoercer.INSTANCE.coerce(obj, type).orElseThrow(() -> {
            return new KsqlException("'" + obj + "' can not be converted to the type of column ROWKEY: " + type);
        });
    }

    private Range<Instant> extractWhereClauseWindowBounds(Optional<List<ComparisonExpression>> optional) {
        if (!optional.isPresent()) {
            return Range.all();
        }
        Map map = (Map) optional.get().stream().collect(Collectors.groupingBy(this::getSimplifiedBoundType));
        Sets.SetView difference = Sets.difference(map.keySet(), VALID_WINDOW_BOUNDS_TYPES);
        if (!difference.isEmpty()) {
            throw invalidWhereClauseException("Unsupported " + ComparisonTarget.WINDOWSTART + " bounds: " + difference, true);
        }
        String str = (String) map.entrySet().stream().filter(entry -> {
            return ((List) entry.getValue()).size() > 1;
        }).map(entry2 -> {
            return entry2.getKey() + ": " + entry2.getValue();
        }).collect(Collectors.joining(System.lineSeparator()));
        if (!str.isEmpty()) {
            throw invalidWhereClauseException("Duplicate bounds on " + ComparisonTarget.WINDOWSTART + ": " + str, true);
        }
        Map map2 = (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry3 -> {
            return (ComparisonExpression) ((List) entry3.getValue()).get(0);
        }));
        ComparisonExpression comparisonExpression = (ComparisonExpression) map2.get(ComparisonExpression.Type.EQUAL);
        if (comparisonExpression == null) {
            return extractWindowBound(Optional.ofNullable(map2.get(ComparisonExpression.Type.GREATER_THAN)), Optional.ofNullable(map2.get(ComparisonExpression.Type.LESS_THAN)));
        }
        if (map.size() > 1) {
            throw invalidWhereClauseException("`" + comparisonExpression + "` cannot be combined with other bounds on " + ComparisonTarget.WINDOWSTART, true);
        }
        return Range.singleton(asInstant(getNonColumnRefSide(comparisonExpression)));
    }

    private ComparisonExpression.Type getSimplifiedBoundType(ComparisonExpression comparisonExpression) {
        ComparisonExpression.Type type = comparisonExpression.getType();
        boolean z = comparisonExpression.getRight() instanceof UnqualifiedColumnReferenceExp;
        switch (AnonymousClass1.$SwitchMap$io$confluent$ksql$execution$expression$tree$ComparisonExpression$Type[type.ordinal()]) {
            case 1:
            case 2:
                return z ? ComparisonExpression.Type.GREATER_THAN : ComparisonExpression.Type.LESS_THAN;
            case 3:
            case 4:
                return z ? ComparisonExpression.Type.LESS_THAN : ComparisonExpression.Type.GREATER_THAN;
            default:
                return type;
        }
    }

    private Range<Instant> extractWindowBound(Optional<ComparisonExpression> optional, Optional<ComparisonExpression> optional2) {
        return (optional.isPresent() || optional2.isPresent()) ? !optional.isPresent() ? Range.upTo(asInstant(getNonColumnRefSide(optional2.get())), getRangeBoundType(optional2.get())) : !optional2.isPresent() ? Range.downTo(asInstant(getNonColumnRefSide(optional.get())), getRangeBoundType(optional.get())) : Range.range(asInstant(getNonColumnRefSide(optional.get())), getRangeBoundType(optional.get()), asInstant(getNonColumnRefSide(optional2.get())), getRangeBoundType(optional2.get())) : Range.all();
    }

    private BoundType getRangeBoundType(ComparisonExpression comparisonExpression) {
        return comparisonExpression.getType() == ComparisonExpression.Type.LESS_THAN || comparisonExpression.getType() == ComparisonExpression.Type.GREATER_THAN ? BoundType.OPEN : BoundType.CLOSED;
    }

    private Expression getNonColumnRefSide(ComparisonExpression comparisonExpression) {
        return comparisonExpression.getRight() instanceof UnqualifiedColumnReferenceExp ? comparisonExpression.getLeft() : comparisonExpression.getRight();
    }

    private Instant asInstant(Expression expression) {
        if (expression instanceof IntegerLiteral) {
            return Instant.ofEpochMilli(((IntegerLiteral) expression).getValue().intValue());
        }
        if (expression instanceof LongLiteral) {
            return Instant.ofEpochMilli(((LongLiteral) expression).getValue().longValue());
        }
        if (!(expression instanceof StringLiteral)) {
            throw invalidWhereClauseException(ComparisonTarget.WINDOWSTART + " bounds must be BIGINT", true);
        }
        String value = ((StringLiteral) expression).getValue();
        try {
            return Instant.ofEpochMilli(new PartialStringToTimestampParser().parse(value));
        } catch (Exception e) {
            throw invalidWhereClauseException("Failed to parse datetime: " + value, true);
        }
    }

    private Map<ComparisonTarget, List<ComparisonExpression>> extractComparisons(Expression expression) {
        if (expression instanceof ComparisonExpression) {
            ComparisonExpression comparisonExpression = (ComparisonExpression) expression;
            return ImmutableMap.of(extractWhereClauseTarget(comparisonExpression), ImmutableList.of(comparisonExpression));
        }
        if (!(expression instanceof LogicalBinaryExpression)) {
            throw invalidWhereClauseException("Unsupported expression: " + expression, false);
        }
        LogicalBinaryExpression logicalBinaryExpression = (LogicalBinaryExpression) expression;
        if (logicalBinaryExpression.getType() != LogicalBinaryExpression.Type.AND) {
            throw invalidWhereClauseException("Only AND expressions are supported: " + expression, false);
        }
        return (Map) Stream.concat(extractComparisons(logicalBinaryExpression.getLeft()).entrySet().stream(), extractComparisons(logicalBinaryExpression.getRight()).entrySet().stream()).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (list, list2) -> {
            return ImmutableList.builder().addAll(list).addAll(list2).build();
        }));
    }

    private ComparisonTarget extractWhereClauseTarget(ComparisonExpression comparisonExpression) {
        UnqualifiedColumnReferenceExp left;
        if (comparisonExpression.getRight() instanceof UnqualifiedColumnReferenceExp) {
            left = (UnqualifiedColumnReferenceExp) comparisonExpression.getRight();
        } else {
            if (!(comparisonExpression.getLeft() instanceof UnqualifiedColumnReferenceExp)) {
                throw invalidWhereClauseException("Invalid WHERE clause: " + comparisonExpression, false);
            }
            left = comparisonExpression.getLeft();
        }
        String columnName = left.getReference().toString(FormatOptions.noEscape());
        try {
            return ComparisonTarget.valueOf(columnName.toUpperCase());
        } catch (Exception e) {
            throw invalidWhereClauseException("WHERE clause on unsupported field: " + columnName, false);
        }
    }

    private boolean isSelectStar(Select select) {
        List selectItems = select.getSelectItems();
        return selectItems.size() == 1 && (selectItems.get(0) instanceof AllColumns);
    }

    private List<List<?>> handleSelects(Result result, ConfiguredStatement<Query> configuredStatement, KsqlExecutionContext ksqlExecutionContext, ImmutableAnalysis immutableAnalysis, LogicalSchema logicalSchema, Optional<WindowType> optional, QueryId queryId, QueryContext.Stacker stacker) {
        LogicalSchema withMetaAndKeyColsInValue;
        Function function;
        if (immutableAnalysis.getSelectColumnRefs().stream().noneMatch(SchemaUtil::isSystemColumn)) {
            withMetaAndKeyColsInValue = result.schema;
            function = (v0) -> {
                return v0.value();
            };
        } else {
            withMetaAndKeyColsInValue = result.schema.withMetaAndKeyColsInValue(optional.isPresent());
            function = tableRow -> {
                Struct key = tableRow.key();
                GenericRow value = tableRow.value();
                Stream stream = key.schema().fields().stream();
                key.getClass();
                List list = (List) stream.map(key::get).collect(Collectors.toList());
                value.ensureAdditionalCapacity(1 + list.size() + ((Integer) tableRow.window().map(window -> {
                    return 2;
                }).orElse(0)).intValue());
                value.append(Long.valueOf(tableRow.rowTime()));
                value.appendAll(list);
                tableRow.window().ifPresent(window2 -> {
                    value.append(Long.valueOf(window2.start().toEpochMilli()));
                    value.append(Long.valueOf(window2.end().toEpochMilli()));
                });
                return value;
            };
        }
        KsqlTransformer transformer = SelectValueMapperFactory.create(immutableAnalysis.getSelectExpressions(), withMetaAndKeyColsInValue, configuredStatement.getConfig().cloneWithPropertyOverwrite(configuredStatement.getOverrides()), ksqlExecutionContext.getMetaStore()).getTransformer(ksqlExecutionContext.getProcessingLogContext().getLoggerFactory().getLogger(QueryLoggerUtil.queryLoggerName(queryId, stacker.push(new String[]{"PROJECT"}).getQueryContext())));
        ImmutableList.Builder builder = ImmutableList.builder();
        Function function2 = function;
        result.rows.forEach(tableRow2 -> {
            GenericRow genericRow = (GenericRow) transformer.transform(tableRow2.key(), (GenericRow) function2.apply(tableRow2), new PullProcessingContext(tableRow2.rowTime()));
            validateProjection(genericRow, logicalSchema);
            builder.add(genericRow.values());
        });
        return builder.build();
    }

    private void validateProjection(GenericRow genericRow, LogicalSchema logicalSchema) {
        int size = genericRow.size();
        int size2 = logicalSchema.columns().size();
        if (size != size2) {
            throw new IllegalStateException("Row column count mismatch. expected:" + size2 + ", got:" + size);
        }
    }

    private LogicalSchema selectOutputSchema(Result result, KsqlExecutionContext ksqlExecutionContext, ImmutableAnalysis immutableAnalysis, Optional<WindowType> optional) {
        LogicalSchema.Builder noImplicitColumns = LogicalSchema.builder().noImplicitColumns();
        ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager(result.schema.withMetaAndKeyColsInValue(optional.isPresent()), ksqlExecutionContext.getMetaStore());
        for (int i = 0; i < immutableAnalysis.getSelectExpressions().size(); i++) {
            SelectExpression selectExpression = (SelectExpression) immutableAnalysis.getSelectExpressions().get(i);
            SqlType expressionSqlType = expressionTypeManager.getExpressionSqlType(selectExpression.getExpression());
            if (result.schema.isKeyColumn(selectExpression.getAlias()) || selectExpression.getAlias().equals(SchemaUtil.WINDOWSTART_NAME) || selectExpression.getAlias().equals(SchemaUtil.WINDOWEND_NAME)) {
                noImplicitColumns.keyColumn(selectExpression.getAlias(), expressionSqlType);
            } else {
                noImplicitColumns.valueColumn(selectExpression.getAlias(), expressionSqlType);
            }
        }
        return noImplicitColumns.build();
    }

    private PersistentQueryMetadata findMaterializingQuery(KsqlExecutionContext ksqlExecutionContext, ImmutableAnalysis immutableAnalysis) {
        MetaStore metaStore = ksqlExecutionContext.getMetaStore();
        SourceName sourceName = getSourceName(immutableAnalysis);
        Set queriesWithSink = metaStore.getQueriesWithSink(sourceName);
        if (queriesWithSink.isEmpty()) {
            throw notMaterializedException(sourceName);
        }
        if (queriesWithSink.size() > 1) {
            throw new KsqlException("Multiple queries currently materialize '" + sourceName + "'. KSQL currently only supports pull queries when the table has only been materialized once.");
        }
        return (PersistentQueryMetadata) ksqlExecutionContext.getPersistentQuery(new QueryId((String) Iterables.get(queriesWithSink, 0))).orElseThrow(() -> {
            return new KsqlException("Materializing query has been stopped");
        });
    }

    private SourceName getSourceName(ImmutableAnalysis immutableAnalysis) {
        return ((Analysis.AliasedDataSource) immutableAnalysis.getFromDataSources().get(0)).getDataSource().getName();
    }

    @VisibleForTesting
    TableRowsEntity forwardTo(Locator.KsqlNode ksqlNode, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext) {
        RestResponse makeQueryRequest = serviceContext.getKsqlClient().makeQueryRequest(ksqlNode.location(), configuredStatement.getStatementText(), configuredStatement.getOverrides());
        if (makeQueryRequest.isErroneous()) {
            throw new KsqlServerException("Proxy attempt failed: " + makeQueryRequest.getErrorMessage());
        }
        List list = (List) makeQueryRequest.getResponse();
        if (list.isEmpty()) {
            throw new KsqlServerException("Invalid empty response from proxy call");
        }
        StreamedRow.Header header = (StreamedRow.Header) ((StreamedRow) list.get(0)).getHeader().orElseThrow(() -> {
            return new KsqlServerException("Expected header in first row");
        });
        ImmutableList.Builder builder = ImmutableList.builder();
        for (StreamedRow streamedRow : list.subList(1, list.size())) {
            if (streamedRow.getErrorMessage().isPresent()) {
                throw new KsqlStatementException(((KsqlErrorMessage) streamedRow.getErrorMessage().get()).getMessage(), configuredStatement.getStatementText());
            }
            if (!streamedRow.getRow().isPresent()) {
                throw new KsqlServerException("Unexpected proxy response");
            }
            builder.add(((GenericRow) streamedRow.getRow().get()).values());
        }
        return new TableRowsEntity(configuredStatement.getStatementText(), header.getQueryId(), header.getSchema(), builder.build());
    }

    private KsqlException notMaterializedException(SourceName sourceName) {
        return new KsqlException("'" + sourceName.toString(FormatOptions.noEscape()) + "' is not materialized. Refer to https://cnfl.io/queries for info on query types. If you intended to issue a push query, resubmit with the EMIT CHANGES clause" + System.lineSeparator() + " KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a 'CREATE TABLE AS SELECT <fields>, <aggregate_functions> FROM <sources> GROUP BY <key>' style statement." + System.lineSeparator() + PullQueryValidator.NEW_QUERY_SYNTAX_ADDITIONAL_HELP);
    }

    private KsqlException invalidWhereClauseException(String str, boolean z) {
        return new KsqlException(str + ". Refer to https://cnfl.io/queries for info on query types. If you intended to issue a push query, resubmit with the EMIT CHANGES clause" + System.lineSeparator() + "Pull queries require a WHERE clause that:" + System.lineSeparator() + " - limits the query to a single ROWKEY, e.g. `SELECT * FROM X WHERE ROWKEY=Y;`." + (!z ? "" : System.lineSeparator() + " - limits the time bounds of the windowed table. This can be: " + System.lineSeparator() + "    + a single window lower bound, e.g. `WHERE WINDOWSTART = z`, or" + System.lineSeparator() + "    + a range, e.g. `WHERE a <= WINDOWSTART AND WINDOWSTART < b" + System.lineSeparator() + "WINDOWSTART currently supports operators: " + VALID_WINDOW_BOUNDS_TYPES_STRING + System.lineSeparator() + "WINDOWSTART currently comparison with epoch milliseconds or a datetime string in the form: yyyy-MM-dd'T'HH:mm:ss.SSS with an optional numeric 4-digit timezone, e.g. '+0100'"));
    }

    private Struct asKeyStruct(Object obj, PhysicalSchema physicalSchema) {
        Struct struct = new Struct(physicalSchema.keySchema().ksqlSchema());
        struct.put(SchemaUtil.ROWKEY_NAME.name(), obj);
        return struct;
    }
}
