package io.confluent.ksql.embedded;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.function.UserFunctionLoader;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.properties.PropertyOverrider;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.SequentialQueryIdGenerator;
import io.confluent.ksql.services.DisabledKsqlClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.ServiceContextFactory;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.statement.Injectors;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/embedded/KsqlContext.class */
public class KsqlContext implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(KsqlContext.class);
    private final ServiceContext serviceContext;
    private final KsqlConfig ksqlConfig;
    private final KsqlEngine ksqlEngine;
    private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/confluent/ksql/embedded/KsqlContext$CustomExecutor.class */
    public interface CustomExecutor {
        KsqlExecutionContext.ExecuteResult apply(KsqlExecutionContext ksqlExecutionContext, ConfiguredStatement<?> configuredStatement, Map<String, Object> map);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/embedded/KsqlContext$CustomExecutors.class */
    public enum CustomExecutors {
        SET_PROPERTY(SetProperty.class, (ksqlExecutionContext, configuredStatement, map) -> {
            PropertyOverrider.set(configuredStatement, map);
            return KsqlExecutionContext.ExecuteResult.of("Successfully executed " + configuredStatement.getStatement());
        }),
        UNSET_PROPERTY(UnsetProperty.class, (ksqlExecutionContext2, configuredStatement2, map2) -> {
            PropertyOverrider.unset(configuredStatement2, map2);
            return KsqlExecutionContext.ExecuteResult.of("Successfully executed " + configuredStatement2.getStatement());
        }),
        QUERY(Query.class, (ksqlExecutionContext3, configuredStatement3, map3) -> {
            return KsqlExecutionContext.ExecuteResult.of(ksqlExecutionContext3.executeTransientQuery(ksqlExecutionContext3.getServiceContext(), configuredStatement3.cast(), false));
        });

        public static final Map<Class<? extends Statement>, CustomExecutor> EXECUTOR_MAP = ImmutableMap.copyOf((Map) EnumSet.allOf(CustomExecutors.class).stream().collect(Collectors.toMap((v0) -> {
            return v0.getStatementClass();
        }, (v0) -> {
            return v0.getExecutor();
        })));
        private final Class<? extends Statement> statementClass;
        private final CustomExecutor executor;

        CustomExecutors(Class cls, CustomExecutor customExecutor) {
            this.statementClass = (Class) Objects.requireNonNull(cls, "statementClass");
            this.executor = (CustomExecutor) Objects.requireNonNull(customExecutor, "executor");
        }

        private Class<? extends Statement> getStatementClass() {
            return this.statementClass;
        }

        private CustomExecutor getExecutor() {
            return this::execute;
        }

        public KsqlExecutionContext.ExecuteResult execute(KsqlExecutionContext ksqlExecutionContext, ConfiguredStatement<?> configuredStatement, Map<String, Object> map) {
            return this.executor.apply(ksqlExecutionContext, configuredStatement, map);
        }
    }

    public static KsqlContext create(KsqlConfig ksqlConfig, ProcessingLogContext processingLogContext, MetricCollectors metricCollectors) {
        Objects.requireNonNull(ksqlConfig, "ksqlConfig cannot be null.");
        ServiceContext create = ServiceContextFactory.create(ksqlConfig, DisabledKsqlClient::instance);
        InternalFunctionRegistry internalFunctionRegistry = new InternalFunctionRegistry();
        UserFunctionLoader.newInstance(ksqlConfig, internalFunctionRegistry, ".", metricCollectors.getMetrics()).load();
        return new KsqlContext(create, ksqlConfig, new KsqlEngine(create, processingLogContext, internalFunctionRegistry, ServiceInfo.create(ksqlConfig), new SequentialQueryIdGenerator(), ksqlConfig, Collections.emptyList(), metricCollectors), Injectors.DEFAULT);
    }

    @VisibleForTesting
    KsqlContext(ServiceContext serviceContext, KsqlConfig ksqlConfig, KsqlEngine ksqlEngine, BiFunction<KsqlExecutionContext, ServiceContext, Injector> biFunction) {
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.injectorFactory = (BiFunction) Objects.requireNonNull(biFunction, "injectorFactory");
    }

    public ServiceContext getServiceContext() {
        return this.serviceContext;
    }

    public MetaStore getMetaStore() {
        return this.ksqlEngine.getMetaStore();
    }

    public List<QueryMetadata> sql(String str) {
        return sql(str, Collections.emptyMap());
    }

    public List<QueryMetadata> sql(String str, Map<String, ?> map) {
        List<KsqlParser.ParsedStatement> parse = this.ksqlEngine.parse(str);
        KsqlExecutionContext createSandbox = this.ksqlEngine.createSandbox(this.ksqlEngine.getServiceContext());
        HashMap hashMap = new HashMap(map);
        Iterator<KsqlParser.ParsedStatement> it = parse.iterator();
        while (it.hasNext()) {
            execute(createSandbox, it.next(), this.ksqlConfig, hashMap, this.injectorFactory.apply(createSandbox, createSandbox.getServiceContext()));
        }
        ArrayList<QueryMetadata> arrayList = new ArrayList();
        Injector apply = this.injectorFactory.apply(this.ksqlEngine, this.serviceContext);
        HashMap hashMap2 = new HashMap(map);
        Iterator<KsqlParser.ParsedStatement> it2 = parse.iterator();
        while (it2.hasNext()) {
            Optional<QueryMetadata> query = execute(this.ksqlEngine, it2.next(), this.ksqlConfig, hashMap2, apply).getQuery();
            arrayList.getClass();
            query.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        for (QueryMetadata queryMetadata : arrayList) {
            if (queryMetadata instanceof PersistentQueryMetadata) {
                queryMetadata.start();
            } else {
                LOG.warn("Ignoring statemenst: {}", str);
                LOG.warn("Only CREATE statements can run in KSQL embedded mode.");
            }
        }
        return arrayList;
    }

    @Deprecated
    public Set<QueryMetadata> getRunningQueries() {
        return new HashSet(this.ksqlEngine.getPersistentQueries());
    }

    public List<PersistentQueryMetadata> getPersistentQueries() {
        return this.ksqlEngine.getPersistentQueries();
    }

    public Optional<PersistentQueryMetadata> getPersistentQuery(QueryId queryId) {
        return this.ksqlEngine.getPersistentQuery(queryId);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.ksqlEngine.close();
        this.serviceContext.close();
    }

    @VisibleForTesting
    public void terminateQuery(QueryId queryId) {
        this.ksqlEngine.getPersistentQuery(queryId).ifPresent(persistentQueryMetadata -> {
            persistentQueryMetadata.close();
            this.ksqlEngine.removeQueryFromAssignor(persistentQueryMetadata);
        });
    }

    @VisibleForTesting
    public void pauseQuery(QueryId queryId) {
        this.ksqlEngine.getPersistentQuery(queryId).ifPresent((v0) -> {
            v0.pause();
        });
    }

    @VisibleForTesting
    public void resumeQuery(QueryId queryId) {
        this.ksqlEngine.getPersistentQuery(queryId).ifPresent((v0) -> {
            v0.resume();
        });
    }

    private static KsqlExecutionContext.ExecuteResult execute(KsqlExecutionContext ksqlExecutionContext, KsqlParser.ParsedStatement parsedStatement, KsqlConfig ksqlConfig, Map<String, Object> map, Injector injector) {
        ConfiguredStatement<?> inject = injector.inject(ConfiguredStatement.of(ksqlExecutionContext.prepare(parsedStatement), SessionConfig.of(ksqlConfig, map)));
        return CustomExecutors.EXECUTOR_MAP.getOrDefault(inject.getStatement().getClass(), (ksqlExecutionContext2, configuredStatement, map2) -> {
            return ksqlExecutionContext2.execute(ksqlExecutionContext2.getServiceContext(), (ConfiguredStatement<?>) configuredStatement);
        }).apply(ksqlExecutionContext, inject, map);
    }
}
