package io.confluent.ksql.rest.server.resources;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.DefaultKsqlParser;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.DescribeFunction;
import io.confluent.ksql.parser.tree.ListFunctions;
import io.confluent.ksql.parser.tree.ListProperties;
import io.confluent.ksql.parser.tree.ListTopics;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.computation.DistributingExecutor;
import io.confluent.ksql.rest.server.computation.ValidatedCommandFactory;
import io.confluent.ksql.rest.server.execution.CustomExecutors;
import io.confluent.ksql.rest.server.execution.DefaultCommandQueueSync;
import io.confluent.ksql.rest.server.execution.RequestHandler;
import io.confluent.ksql.rest.server.validation.CustomValidators;
import io.confluent.ksql.rest.server.validation.RequestValidator;
import io.confluent.ksql.rest.server.validation.StatementValidator;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.SandboxedServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.statement.Injectors;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/vnd.ksql.v1+json", "application/json"})
@Path("/ksql")
@Consumes({"application/vnd.ksql.v1+json", "application/json"})
/* loaded from: input_file:io/confluent/ksql/rest/server/resources/KsqlResource.class */
public class KsqlResource implements KsqlConfigurable {
    private static final Logger LOG = LoggerFactory.getLogger(KsqlResource.class);
    private static final List<KsqlParser.ParsedStatement> TERMINATE_CLUSTER = new DefaultKsqlParser().parse(TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT);
    private static final Set<Class<? extends Statement>> SYNC_BLACKLIST = ImmutableSet.builder().add(ListTopics.class).add(ListFunctions.class).add(DescribeFunction.class).add(ListProperties.class).add(SetProperty.class).add(UnsetProperty.class).build();
    private final KsqlEngine ksqlEngine;
    private final CommandQueue commandQueue;
    private final Duration distributedCmdResponseTimeout;
    private final ActivenessRegistrar activenessRegistrar;
    private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
    private final Optional<KsqlAuthorizationValidator> authorizationValidator;
    private RequestValidator validator;
    private RequestHandler handler;
    private final Errors errorHandler;

    public KsqlResource(KsqlEngine ksqlEngine, CommandQueue commandQueue, Duration duration, ActivenessRegistrar activenessRegistrar, Optional<KsqlAuthorizationValidator> optional, Errors errors) {
        this(ksqlEngine, commandQueue, duration, activenessRegistrar, Injectors.DEFAULT, optional, errors);
    }

    KsqlResource(KsqlEngine ksqlEngine, CommandQueue commandQueue, Duration duration, ActivenessRegistrar activenessRegistrar, BiFunction<KsqlExecutionContext, ServiceContext, Injector> biFunction, Optional<KsqlAuthorizationValidator> optional, Errors errors) {
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.commandQueue = (CommandQueue) Objects.requireNonNull(commandQueue, "commandQueue");
        this.distributedCmdResponseTimeout = (Duration) Objects.requireNonNull(duration, "distributedCmdResponseTimeout");
        this.activenessRegistrar = (ActivenessRegistrar) Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
        this.injectorFactory = (BiFunction) Objects.requireNonNull(biFunction, "injectorFactory");
        this.authorizationValidator = (Optional) Objects.requireNonNull(optional, "authorizationValidator");
        this.errorHandler = (Errors) Objects.requireNonNull(errors, "errorHandler");
    }

    @Override // io.confluent.ksql.rest.server.resources.KsqlConfigurable
    public void configure(KsqlConfig ksqlConfig) {
        if (!ksqlConfig.getKsqlStreamConfigProps().containsKey("application.server")) {
            throw new IllegalArgumentException("Need KS application server set");
        }
        Map<Class<? extends Statement>, StatementValidator<?>> map = CustomValidators.VALIDATOR_MAP;
        BiFunction<KsqlExecutionContext, ServiceContext, Injector> biFunction = this.injectorFactory;
        KsqlEngine ksqlEngine = this.ksqlEngine;
        ksqlEngine.getClass();
        this.validator = new RequestValidator(map, biFunction, ksqlEngine::createSandbox, ksqlConfig, new ValidatedCommandFactory(ksqlConfig));
        this.handler = new RequestHandler(CustomExecutors.EXECUTOR_MAP, new DistributingExecutor(ksqlConfig, this.commandQueue, this.distributedCmdResponseTimeout, this.injectorFactory, this.authorizationValidator, new ValidatedCommandFactory(ksqlConfig), this.errorHandler), this.ksqlEngine, ksqlConfig, new DefaultCommandQueueSync(this.commandQueue, KsqlResource::shouldSynchronize, this.distributedCmdResponseTimeout));
    }

    @POST
    @Path("/terminate")
    public Response terminateCluster(@Context KsqlSecurityContext ksqlSecurityContext, ClusterTerminateRequest clusterTerminateRequest) {
        LOG.info("Received: " + clusterTerminateRequest);
        throwIfNotConfigured();
        ensureValidPatterns(clusterTerminateRequest.getDeleteTopicList());
        try {
            return Response.ok(this.handler.execute(ksqlSecurityContext, TERMINATE_CLUSTER, clusterTerminateRequest.getStreamsProperties())).build();
        } catch (Exception e) {
            return Errors.serverErrorForStatement(e, TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT, new KsqlEntityList());
        }
    }

    @POST
    public Response handleKsqlStatements(@Context KsqlSecurityContext ksqlSecurityContext, KsqlRequest ksqlRequest) {
        LOG.info("Received: " + ksqlRequest);
        throwIfNotConfigured();
        this.activenessRegistrar.updateLastRequestTime();
        try {
            CommandStoreUtil.httpWaitForCommandSequenceNumber(this.commandQueue, ksqlRequest, this.distributedCmdResponseTimeout);
            List<KsqlParser.ParsedStatement> parse = this.ksqlEngine.parse(ksqlRequest.getKsql());
            this.validator.validate(SandboxedServiceContext.create(ksqlSecurityContext.getServiceContext()), parse, ksqlRequest.getStreamsProperties(), ksqlRequest.getKsql());
            return Response.ok(this.handler.execute(ksqlSecurityContext, parse, ksqlRequest.getStreamsProperties())).build();
        } catch (KsqlRestException e) {
            throw e;
        } catch (Exception e2) {
            return this.errorHandler.generateResponse(e2, Errors.serverErrorForStatement(e2, ksqlRequest.getKsql()));
        } catch (KsqlStatementException e3) {
            return Errors.badStatement(e3.getRawMessage(), e3.getSqlStatement());
        } catch (KsqlException e4) {
            return this.errorHandler.generateResponse(e4, Errors.badRequest(e4));
        }
    }

    private void throwIfNotConfigured() {
        if (this.validator == null || this.handler == null) {
            throw new KsqlRestException(Errors.notReady());
        }
    }

    private static boolean shouldSynchronize(Class<? extends Statement> cls) {
        return !SYNC_BLACKLIST.contains(cls) && CustomExecutors.EXECUTOR_MAP.containsKey(cls);
    }

    private static void ensureValidPatterns(List<String> list) {
        list.forEach(str -> {
            try {
                Pattern.compile(str);
            } catch (PatternSyntaxException e) {
                throw new KsqlRestException(Errors.badRequest("Invalid pattern: " + str));
            }
        });
    }
}
