package io.confluent.ksql.engine;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.QueryCleanupService;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metrics.StreamsErrorCollector;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.physical.scalablepush.PushRouting;
import io.confluent.ksql.physical.scalablepush.PushRoutingOptions;
import io.confluent.ksql.planner.QueryPlannerOptions;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.ScalablePushQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.vertx.core.Context;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/engine/KsqlEngine.class */
public class KsqlEngine implements KsqlExecutionContext, Closeable {
    private static final Logger log = LoggerFactory.getLogger(KsqlEngine.class);
    private final KsqlEngineMetrics engineMetrics;
    private final ScheduledExecutorService aggregateMetricsCollector;
    private final String serviceId;
    private final EngineContext primaryContext;
    private final QueryCleanupService cleanupService;
    private final OrphanedTransientQueryCleaner orphanedTransientQueryCleaner;

    /* loaded from: input_file:io/confluent/ksql/engine/KsqlEngine$CleanupListener.class */
    private static final class CleanupListener implements QueryEventListener {
        final QueryCleanupService cleanupService;
        final ServiceContext serviceContext;
        final KsqlConfig ksqlConfig;

        private CleanupListener(QueryCleanupService queryCleanupService, ServiceContext serviceContext, KsqlConfig ksqlConfig) {
            this.cleanupService = queryCleanupService;
            this.serviceContext = serviceContext;
            this.ksqlConfig = ksqlConfig;
        }

        @Override // io.confluent.ksql.engine.QueryEventListener
        public void onClose(QueryMetadata queryMetadata) {
            String queryApplicationId = queryMetadata.getQueryApplicationId();
            if (queryMetadata.hasEverBeenStarted()) {
                this.cleanupService.addCleanupTask(new QueryCleanupService.QueryCleanupTask(this.serviceContext, queryApplicationId, queryMetadata instanceof TransientQueryMetadata, this.ksqlConfig.getKsqlStreamConfigProps().getOrDefault("state.dir", StreamsConfig.configDef().defaultValues().get("state.dir")).toString()));
            }
            StreamsErrorCollector.notifyApplicationClose(queryApplicationId);
        }
    }

    public KsqlEngine(ServiceContext serviceContext, ProcessingLogContext processingLogContext, FunctionRegistry functionRegistry, ServiceInfo serviceInfo, QueryIdGenerator queryIdGenerator, KsqlConfig ksqlConfig, List<QueryEventListener> list) {
        this(serviceContext, processingLogContext, serviceInfo.serviceId(), new MetaStoreImpl(functionRegistry), ksqlEngine -> {
            return new KsqlEngineMetrics(serviceInfo.metricsPrefix(), ksqlEngine, serviceInfo.customMetricsTags(), serviceInfo.metricsExtension());
        }, queryIdGenerator, ksqlConfig, list);
    }

    public KsqlEngine(ServiceContext serviceContext, ProcessingLogContext processingLogContext, String str, MutableMetaStore mutableMetaStore, Function<KsqlEngine, KsqlEngineMetrics> function, QueryIdGenerator queryIdGenerator, KsqlConfig ksqlConfig, List<QueryEventListener> list) {
        this.cleanupService = new QueryCleanupService();
        this.orphanedTransientQueryCleaner = new OrphanedTransientQueryCleaner(this.cleanupService, ksqlConfig);
        this.serviceId = (String) Objects.requireNonNull(str, "serviceId");
        this.engineMetrics = function.apply(this);
        this.primaryContext = EngineContext.create(serviceContext, processingLogContext, mutableMetaStore, queryIdGenerator, this.cleanupService, ksqlConfig, ImmutableList.builder().addAll(list).add(this.engineMetrics.getQueryEventListener()).add(new CleanupListener(this.cleanupService, serviceContext, ksqlConfig)).build());
        this.aggregateMetricsCollector = Executors.newSingleThreadScheduledExecutor();
        this.aggregateMetricsCollector.scheduleAtFixedRate(() -> {
            try {
                this.engineMetrics.updateMetrics();
            } catch (Exception e) {
                log.info("Error updating engine metrics", e);
            }
        }, 1000L, 1000L, TimeUnit.MILLISECONDS);
        this.cleanupService.startAsync();
    }

    public int numberOfLiveQueries() {
        return this.primaryContext.getQueryRegistry().getAllLiveQueries().size();
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public Optional<PersistentQueryMetadata> getPersistentQuery(QueryId queryId) {
        return this.primaryContext.getQueryRegistry().getPersistentQuery(queryId);
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public List<PersistentQueryMetadata> getPersistentQueries() {
        return ImmutableList.copyOf(this.primaryContext.getQueryRegistry().getPersistentQueries().values());
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public Set<QueryId> getQueriesWithSink(SourceName sourceName) {
        return this.primaryContext.getQueryRegistry().getQueriesWithSink(sourceName);
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public List<QueryMetadata> getAllLiveQueries() {
        return this.primaryContext.getQueryRegistry().getAllLiveQueries();
    }

    public boolean hasActiveQueries() {
        return !this.primaryContext.getQueryRegistry().getPersistentQueries().isEmpty();
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public MetaStore getMetaStore() {
        return this.primaryContext.getMetaStore();
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public ServiceContext getServiceContext() {
        return this.primaryContext.getServiceContext();
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public ProcessingLogContext getProcessingLogContext() {
        return this.primaryContext.getProcessingLogContext();
    }

    public String getServiceId() {
        return this.serviceId;
    }

    @VisibleForTesting
    QueryCleanupService getCleanupService() {
        return this.cleanupService;
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public KsqlExecutionContext createSandbox(ServiceContext serviceContext) {
        return new SandboxedExecutionContext(this.primaryContext, serviceContext);
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public List<KsqlParser.ParsedStatement> parse(String str) {
        return this.primaryContext.parse(str);
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public KsqlParser.PreparedStatement<?> prepare(KsqlParser.ParsedStatement parsedStatement, Map<String, String> map) {
        return this.primaryContext.prepare(parsedStatement, map);
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public KsqlPlan plan(ServiceContext serviceContext, ConfiguredStatement<?> configuredStatement) {
        return EngineExecutor.create(this.primaryContext, serviceContext, configuredStatement.getSessionConfig()).plan(configuredStatement);
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public KsqlExecutionContext.ExecuteResult execute(ServiceContext serviceContext, ConfiguredKsqlPlan configuredKsqlPlan) {
        try {
            return EngineExecutor.create(this.primaryContext, serviceContext, configuredKsqlPlan.getConfig()).execute(configuredKsqlPlan.getPlan());
        } catch (KsqlStatementException e) {
            throw e;
        } catch (KsqlException e2) {
            throw new KsqlStatementException(e2.getMessage(), configuredKsqlPlan.getPlan().getStatementText(), e2.getCause());
        }
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public KsqlExecutionContext.ExecuteResult execute(ServiceContext serviceContext, ConfiguredStatement<?> configuredStatement) {
        return execute(serviceContext, ConfiguredKsqlPlan.of(plan(serviceContext, configuredStatement), configuredStatement.getSessionConfig()));
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public TransientQueryMetadata executeQuery(ServiceContext serviceContext, ConfiguredStatement<Query> configuredStatement, boolean z) {
        try {
            return EngineExecutor.create(this.primaryContext, serviceContext, configuredStatement.getSessionConfig()).executeQuery(configuredStatement, z);
        } catch (KsqlException e) {
            throw new KsqlStatementException(e.getMessage(), configuredStatement.getStatementText(), e.getCause());
        } catch (KsqlStatementException e2) {
            throw e2;
        }
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public ScalablePushQueryMetadata executeScalablePushQuery(ServiceContext serviceContext, ConfiguredStatement<Query> configuredStatement, PushRouting pushRouting, PushRoutingOptions pushRoutingOptions, QueryPlannerOptions queryPlannerOptions, Context context) {
        return EngineExecutor.create(this.primaryContext, serviceContext, configuredStatement.getSessionConfig()).executeScalablePushQuery(configuredStatement, pushRouting, pushRoutingOptions, queryPlannerOptions, context);
    }

    @Override // io.confluent.ksql.KsqlExecutionContext
    public PullQueryResult executePullQuery(ServiceContext serviceContext, ConfiguredStatement<Query> configuredStatement, HARouting hARouting, RoutingOptions routingOptions, QueryPlannerOptions queryPlannerOptions, Optional<PullQueryExecutorMetrics> optional, boolean z) {
        return EngineExecutor.create(this.primaryContext, serviceContext, configuredStatement.getSessionConfig()).executePullQuery(configuredStatement, hARouting, routingOptions, queryPlannerOptions, optional, z);
    }

    public void close(boolean z) {
        this.primaryContext.getQueryRegistry().close(z);
        try {
            this.cleanupService.stopAsync().awaitTerminated(30L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            log.warn("Timed out while closing cleanup service. External resources for the following applications may be orphaned: {}", this.cleanupService.pendingApplicationIds());
        }
        this.engineMetrics.close();
        this.aggregateMetricsCollector.shutdown();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(false);
    }

    public void cleanupOrphanedInternalTopics(ServiceContext serviceContext, Set<String> set) {
        this.orphanedTransientQueryCleaner.cleanupOrphanedInternalTopics(serviceContext, set);
    }

    public static boolean isExecutableStatement(Statement statement) {
        return (statement instanceof ExecutableDdlStatement) || (statement instanceof QueryContainer) || (statement instanceof Query);
    }
}
