package io.confluent.ksql.rest.server.query;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.analyzer.PullQueryValidator;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.execution.pull.HARouting;
import io.confluent.ksql.execution.pull.PullQueryResult;
import io.confluent.ksql.execution.scalablepush.PushRouting;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.LocalCommands;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigPlannerOptions;
import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigRoutingOptions;
import io.confluent.ksql.rest.server.resources.streaming.PushQueryConfigPlannerOptions;
import io.confluent.ksql.rest.server.resources.streaming.PushQueryConfigRoutingOptions;
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.QueryCapacityUtil;
import io.confluent.ksql.rest.util.QueryMetricsUtil;
import io.confluent.ksql.rest.util.RateLimiter;
import io.confluent.ksql.rest.util.ScalablePushUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.ScalablePushQueryMetadata;
import io.confluent.ksql.util.StreamPullQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.vertx.core.Context;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/query/QueryExecutor.class */
public class QueryExecutor {
    private static final Logger log = LogManager.getLogger(QueryExecutor.class);
    private final KsqlExecutionContext ksqlEngine;
    private final KsqlRestConfig ksqlRestConfig;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
    private final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics;
    private final RateLimiter rateLimiter;
    private final ConcurrencyLimiter concurrencyLimiter;
    private final SlidingWindowRateLimiter pullBandRateLimiter;
    private final SlidingWindowRateLimiter scalablePushBandRateLimiter;
    private final HARouting routing;
    private final PushRouting pushRouting;
    private final Optional<LocalCommands> localCommands;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.ksql.rest.server.query.QueryExecutor$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/ksql/rest/server/query/QueryExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$ksql$metastore$model$DataSource$DataSourceType = new int[DataSource.DataSourceType.values().length];

        static {
            try {
                $SwitchMap$io$confluent$ksql$metastore$model$DataSource$DataSourceType[DataSource.DataSourceType.KTABLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$ksql$metastore$model$DataSource$DataSourceType[DataSource.DataSourceType.KSTREAM.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public QueryExecutor(KsqlEngine ksqlEngine, KsqlRestConfig ksqlRestConfig, KsqlConfig ksqlConfig, Optional<PullQueryExecutorMetrics> optional, Optional<ScalablePushQueryMetrics> optional2, RateLimiter rateLimiter, ConcurrencyLimiter concurrencyLimiter, SlidingWindowRateLimiter slidingWindowRateLimiter, SlidingWindowRateLimiter slidingWindowRateLimiter2, HARouting hARouting, PushRouting pushRouting, Optional<LocalCommands> optional3) {
        this.ksqlEngine = ksqlEngine;
        this.ksqlRestConfig = ksqlRestConfig;
        this.pullQueryMetrics = optional;
        this.scalablePushQueryMetrics = optional2;
        this.rateLimiter = rateLimiter;
        this.concurrencyLimiter = concurrencyLimiter;
        this.pullBandRateLimiter = slidingWindowRateLimiter;
        this.scalablePushBandRateLimiter = slidingWindowRateLimiter2;
        this.routing = hARouting;
        this.pushRouting = pushRouting;
        this.localCommands = optional3;
    }

    public QueryMetadataHolder handleStatement(ServiceContext serviceContext, Map<String, Object> map, Map<String, Object> map2, KsqlParser.PreparedStatement<?> preparedStatement, Optional<Boolean> optional, MetricsCallbackHolder metricsCallbackHolder, Context context, boolean z) {
        return preparedStatement.getStatement() instanceof Query ? handleQuery(serviceContext, preparedStatement, optional, metricsCallbackHolder, map, map2, context, z) : QueryMetadataHolder.unhandled();
    }

    private QueryMetadataHolder handleQuery(ServiceContext serviceContext, KsqlParser.PreparedStatement<Query> preparedStatement, Optional<Boolean> optional, MetricsCallbackHolder metricsCallbackHolder, Map<String, Object> map, Map<String, Object> map2, Context context, boolean z) {
        if (!preparedStatement.getStatement().isPullQuery()) {
            if (!ScalablePushUtil.isScalablePushQuery(preparedStatement.getStatement(), this.ksqlEngine, this.ksqlEngine.getKsqlConfig(), map)) {
                QueryLogger.info("Transient query created", preparedStatement.getMaskedStatementText());
                return handlePushQuery(serviceContext, preparedStatement, map, z);
            }
            AtomicReference<ScalablePushQueryMetadata> atomicReference = new AtomicReference<>(null);
            metricsCallbackHolder.setCallback(QueryMetricsUtil.initializeScalablePushMetricsCallback(this.scalablePushQueryMetrics, this.scalablePushBandRateLimiter, atomicReference));
            ImmutableAnalysis analyzeQueryWithNoOutputTopic = this.ksqlEngine.analyzeQueryWithNoOutputTopic(preparedStatement.getStatement(), preparedStatement.getMaskedStatementText(), map);
            QueryLogger.info("Scalable push query created", preparedStatement.getMaskedStatementText());
            return handleScalablePushQuery(analyzeQueryWithNoOutputTopic, serviceContext, preparedStatement, map, map2, context, this.scalablePushBandRateLimiter, atomicReference);
        }
        ImmutableAnalysis analyzeQueryWithNoOutputTopic2 = this.ksqlEngine.analyzeQueryWithNoOutputTopic(preparedStatement.getStatement(), preparedStatement.getMaskedStatementText(), map);
        DataSource.DataSourceType dataSourceType = analyzeQueryWithNoOutputTopic2.getFrom().getDataSource().getDataSourceType();
        if (!this.ksqlEngine.getKsqlConfig().getBoolean("ksql.pull.queries.enable").booleanValue()) {
            throw new KsqlStatementException("Pull queries are disabled." + PullQueryValidator.PULL_QUERY_SYNTAX_HELP + System.lineSeparator() + "Please set ksql.pull.queries.enable=true to enable this feature." + System.lineSeparator(), preparedStatement.getMaskedStatementText());
        }
        switch (AnonymousClass1.$SwitchMap$io$confluent$ksql$metastore$model$DataSource$DataSourceType[dataSourceType.ordinal()]) {
            case 1:
                AtomicReference<PullQueryResult> atomicReference2 = new AtomicReference<>(null);
                metricsCallbackHolder.setCallback(QueryMetricsUtil.initializePullTableMetricsCallback(this.pullQueryMetrics, this.pullBandRateLimiter, atomicReference2));
                return handleTablePullQuery(analyzeQueryWithNoOutputTopic2, serviceContext, ConfiguredStatement.of(preparedStatement, SessionConfig.of(this.ksqlEngine.getKsqlConfig(), map)), map2, optional, this.pullBandRateLimiter, atomicReference2, Optional.empty());
            case 2:
                AtomicReference<StreamPullQueryMetadata> atomicReference3 = new AtomicReference<>(null);
                AtomicReference<ConcurrencyLimiter.Decrementer> atomicReference4 = new AtomicReference<>(null);
                metricsCallbackHolder.setCallback(QueryMetricsUtil.initializePullStreamMetricsCallback(this.pullQueryMetrics, this.pullBandRateLimiter, analyzeQueryWithNoOutputTopic2, atomicReference3, atomicReference4));
                return handleStreamPullQuery(analyzeQueryWithNoOutputTopic2, serviceContext, ConfiguredStatement.of(preparedStatement, SessionConfig.of(this.ksqlEngine.getKsqlConfig(), map)), atomicReference3, atomicReference4);
            default:
                throw new KsqlStatementException("Unexpected data source type for pull query: " + String.valueOf(dataSourceType), preparedStatement.getMaskedStatementText());
        }
    }

    /* JADX WARN: Finally extract failed */
    private QueryMetadataHolder handleTablePullQuery(ImmutableAnalysis immutableAnalysis, ServiceContext serviceContext, ConfiguredStatement<Query> configuredStatement, Map<String, Object> map, Optional<Boolean> optional, SlidingWindowRateLimiter slidingWindowRateLimiter, AtomicReference<PullQueryResult> atomicReference, Optional<ConsistencyOffsetVector> optional2) {
        PullQueryConfigRoutingOptions pullQueryConfigRoutingOptions = new PullQueryConfigRoutingOptions(configuredStatement.getSessionConfig().getConfig(false), configuredStatement.getSessionConfig().getOverrides(), map);
        PullQueryConfigPlannerOptions pullQueryConfigPlannerOptions = new PullQueryConfigPlannerOptions(configuredStatement.getSessionConfig().getConfig(false), configuredStatement.getSessionConfig().getOverrides());
        ConcurrencyLimiter.Decrementer decrementer = null;
        if (!(pullQueryConfigRoutingOptions.getIsSkipForwardRequest() && optional.orElse(true).booleanValue())) {
            try {
                this.rateLimiter.checkLimit();
                decrementer = this.concurrencyLimiter.increment();
            } catch (Throwable th) {
                if (decrementer != null) {
                    decrementer.decrementAtMostOnce();
                }
                throw th;
            }
        }
        slidingWindowRateLimiter.allow(KsqlConstants.KsqlQueryType.PULL);
        Optional ofNullable = Optional.ofNullable(decrementer);
        PullQueryResult executeTablePullQuery = this.ksqlEngine.executeTablePullQuery(immutableAnalysis, serviceContext, configuredStatement, this.routing, pullQueryConfigRoutingOptions, pullQueryConfigPlannerOptions, this.pullQueryMetrics, false, optional2);
        atomicReference.set(executeTablePullQuery);
        executeTablePullQuery.onCompletionOrException((r4, th2) -> {
            ofNullable.ifPresent((v0) -> {
                v0.decrementAtMostOnce();
            });
        });
        return QueryMetadataHolder.of(executeTablePullQuery);
    }

    private QueryMetadataHolder handleScalablePushQuery(ImmutableAnalysis immutableAnalysis, ServiceContext serviceContext, KsqlParser.PreparedStatement<Query> preparedStatement, Map<String, Object> map, Map<String, Object> map2, Context context, SlidingWindowRateLimiter slidingWindowRateLimiter, AtomicReference<ScalablePushQueryMetadata> atomicReference) {
        ConfiguredStatement of = ConfiguredStatement.of(preparedStatement, SessionConfig.of(this.ksqlEngine.getKsqlConfig(), map));
        PushQueryConfigRoutingOptions pushQueryConfigRoutingOptions = new PushQueryConfigRoutingOptions(this.ksqlEngine.getKsqlConfig(), map, map2);
        PushQueryConfigPlannerOptions pushQueryConfigPlannerOptions = new PushQueryConfigPlannerOptions(this.ksqlEngine.getKsqlConfig(), map);
        slidingWindowRateLimiter.allow(KsqlConstants.KsqlQueryType.PUSH);
        ScalablePushQueryMetadata executeScalablePushQuery = this.ksqlEngine.executeScalablePushQuery(immutableAnalysis, serviceContext, of, this.pushRouting, pushQueryConfigRoutingOptions, pushQueryConfigPlannerOptions, context, this.scalablePushQueryMetrics);
        executeScalablePushQuery.prepare();
        atomicReference.set(executeScalablePushQuery);
        QueryLogger.info("Streaming scalable push query", preparedStatement.getMaskedStatementText());
        return QueryMetadataHolder.of(executeScalablePushQuery);
    }

    private QueryMetadataHolder handleStreamPullQuery(ImmutableAnalysis immutableAnalysis, ServiceContext serviceContext, ConfiguredStatement<Query> configuredStatement, AtomicReference<StreamPullQueryMetadata> atomicReference, AtomicReference<ConcurrencyLimiter.Decrementer> atomicReference2) {
        this.rateLimiter.checkLimit();
        this.pullBandRateLimiter.allow(KsqlConstants.KsqlQueryType.PULL);
        atomicReference2.set(this.concurrencyLimiter.increment());
        StreamPullQueryMetadata createStreamPullQuery = this.ksqlEngine.createStreamPullQuery(serviceContext, immutableAnalysis, configuredStatement, false);
        atomicReference.set(createStreamPullQuery);
        this.localCommands.ifPresent(localCommands -> {
            localCommands.write(createStreamPullQuery.getTransientQueryMetadata());
        });
        return QueryMetadataHolder.of(createStreamPullQuery);
    }

    private QueryMetadataHolder handlePushQuery(ServiceContext serviceContext, KsqlParser.PreparedStatement<Query> preparedStatement, Map<String, Object> map, boolean z) {
        ConfiguredStatement of = ConfiguredStatement.of(preparedStatement, SessionConfig.of(this.ksqlEngine.getKsqlConfig(), map));
        if (QueryCapacityUtil.exceedsPushQueryCapacity(this.ksqlEngine, this.ksqlRestConfig)) {
            QueryCapacityUtil.throwTooManyActivePushQueriesException(this.ksqlEngine, this.ksqlRestConfig, preparedStatement.getMaskedStatementText());
        }
        TransientQueryMetadata executeTransientQuery = this.ksqlEngine.executeTransientQuery(serviceContext, of, z);
        this.localCommands.ifPresent(localCommands -> {
            localCommands.write(executeTransientQuery);
        });
        QueryLogger.info("Streaming query", preparedStatement.getMaskedStatementText());
        return QueryMetadataHolder.of(executeTransientQuery);
    }
}
