package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;

/* loaded from: input_file:io/confluent/ksql/rest/server/computation/DistributingExecutor.class */
public class DistributingExecutor {
    private final CommandQueue commandQueue;
    private final Duration distributedCmdResponseTimeout;
    private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
    private final Optional<KsqlAuthorizationValidator> authorizationValidator;
    private final ValidatedCommandFactory validatedCommandFactory;
    private final CommandIdAssigner commandIdAssigner = new CommandIdAssigner();
    private final ReservedInternalTopics internalTopics;
    private final Errors errorHandler;
    private final Supplier<String> commandRunnerWarning;

    public DistributingExecutor(KsqlConfig ksqlConfig, CommandQueue commandQueue, Duration duration, BiFunction<KsqlExecutionContext, ServiceContext, Injector> biFunction, Optional<KsqlAuthorizationValidator> optional, ValidatedCommandFactory validatedCommandFactory, Errors errors, Supplier<String> supplier) {
        this.commandQueue = commandQueue;
        this.distributedCmdResponseTimeout = (Duration) Objects.requireNonNull(duration, "distributedCmdResponseTimeout");
        this.injectorFactory = (BiFunction) Objects.requireNonNull(biFunction, "injectorFactory");
        this.authorizationValidator = (Optional) Objects.requireNonNull(optional, "authorizationValidator");
        this.validatedCommandFactory = (ValidatedCommandFactory) Objects.requireNonNull(validatedCommandFactory, "validatedCommandFactory");
        this.internalTopics = new ReservedInternalTopics((KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig"));
        this.errorHandler = (Errors) Objects.requireNonNull(errors, "errorHandler");
        this.commandRunnerWarning = (Supplier) Objects.requireNonNull(supplier, "commandRunnerWarning");
    }

    public Optional<KsqlEntity> execute(ConfiguredStatement<? extends Statement> configuredStatement, KsqlExecutionContext ksqlExecutionContext, KsqlSecurityContext ksqlSecurityContext) {
        String str = this.commandRunnerWarning.get();
        if (!str.equals(KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_DEFAULT)) {
            throw new KsqlServerException("Failed to handle Ksql Statement." + System.lineSeparator() + str);
        }
        ConfiguredStatement<? extends Statement> inject = this.injectorFactory.apply(ksqlExecutionContext, ksqlSecurityContext.getServiceContext()).inject(configuredStatement);
        if (inject.getStatement() instanceof InsertInto) {
            throwIfInsertOnReadOnlyTopic(ksqlExecutionContext.getMetaStore(), (InsertInto) inject.getStatement());
        }
        checkAuthorization(inject, ksqlSecurityContext, ksqlExecutionContext);
        Producer<CommandId, Command> createTransactionalProducer = this.commandQueue.createTransactionalProducer();
        try {
            createTransactionalProducer.initTransactions();
            CommandId commandId = null;
            try {
                try {
                    try {
                        createTransactionalProducer.beginTransaction();
                        this.commandQueue.waitForCommandConsumer();
                        commandId = this.commandIdAssigner.getCommandId(configuredStatement.getStatement());
                        QueuedCommandStatus enqueueCommand = this.commandQueue.enqueueCommand(commandId, this.validatedCommandFactory.create(inject, ksqlExecutionContext.createSandbox(ksqlExecutionContext.getServiceContext())), createTransactionalProducer);
                        createTransactionalProducer.commitTransaction();
                        Optional<KsqlEntity> of = Optional.of(new CommandStatusEntity(inject.getStatementText(), enqueueCommand.getCommandId(), enqueueCommand.tryWaitForFinalStatus(this.distributedCmdResponseTimeout), Long.valueOf(enqueueCommand.getCommandSequenceNumber())));
                        createTransactionalProducer.close();
                        return of;
                    } catch (Exception e) {
                        createTransactionalProducer.abortTransaction();
                        if (commandId != null) {
                            this.commandQueue.abortCommand(commandId);
                        }
                        throw new KsqlServerException(String.format("Could not write the statement '%s' into the command topic.", configuredStatement.getStatementText()), e);
                    }
                } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e2) {
                    if (commandId != null) {
                        this.commandQueue.abortCommand(commandId);
                    }
                    throw new KsqlServerException(String.format("Could not write the statement '%s' into the command topic.", configuredStatement.getStatementText()), e2);
                }
            } catch (Throwable th) {
                createTransactionalProducer.close();
                throw th;
            }
        } catch (TimeoutException e3) {
            throw new KsqlServerException(this.errorHandler.transactionInitTimeoutErrorMessage(e3), e3);
        } catch (Exception e4) {
            throw new KsqlServerException(String.format("Could not write the statement '%s' into the command topic: " + e4.getMessage(), configuredStatement.getStatementText()), e4);
        }
    }

    private void checkAuthorization(ConfiguredStatement<?> configuredStatement, KsqlSecurityContext ksqlSecurityContext, KsqlExecutionContext ksqlExecutionContext) {
        Statement statement = configuredStatement.getStatement();
        MetaStore metaStore = ksqlExecutionContext.getMetaStore();
        this.authorizationValidator.ifPresent(ksqlAuthorizationValidator -> {
            ksqlAuthorizationValidator.checkAuthorization(ksqlSecurityContext, metaStore, statement);
        });
        try {
            this.authorizationValidator.ifPresent(ksqlAuthorizationValidator2 -> {
                ksqlAuthorizationValidator2.checkAuthorization(new KsqlSecurityContext(Optional.empty(), ksqlExecutionContext.getServiceContext()), metaStore, statement);
            });
        } catch (Exception e) {
            throw new KsqlServerException("The KSQL server is not permitted to execute the command", e);
        }
    }

    private void throwIfInsertOnReadOnlyTopic(MetaStore metaStore, InsertInto insertInto) {
        DataSource source = metaStore.getSource(insertInto.getTarget());
        if (source == null) {
            throw new KsqlException("Cannot insert into an unknown stream/table: " + insertInto.getTarget());
        }
        if (this.internalTopics.isReadOnly(source.getKafkaTopicName())) {
            throw new KsqlException("Cannot insert into read-only topic: " + source.getKafkaTopicName());
        }
    }
}
