package io.confluent.ksql.rest.server.computation;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.config.ConfigItem;
import io.confluent.ksql.config.KsqlConfigResolver;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.execution.json.PlanJsonMapper;
import io.confluent.ksql.parser.tree.AlterSystemProperty;
import io.confluent.ksql.parser.tree.PauseQuery;
import io.confluent.ksql.parser.tree.ResumeQuery;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.PersistentQueryMetadataImpl;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/computation/ValidatedCommandFactory.class */
public final class ValidatedCommandFactory {
    private static final Logger LOG = LogManager.getLogger(ValidatedCommandFactory.class);

    public Command create(ConfiguredStatement<? extends Statement> configuredStatement, KsqlExecutionContext ksqlExecutionContext) {
        return create(configuredStatement, ksqlExecutionContext.getServiceContext(), ksqlExecutionContext);
    }

    public Command create(ConfiguredStatement<? extends Statement> configuredStatement, ServiceContext serviceContext, KsqlExecutionContext ksqlExecutionContext) {
        return ensureDeserializable(createCommand(configuredStatement, serviceContext, ksqlExecutionContext));
    }

    private static Command ensureDeserializable(Command command) {
        try {
            PlanJsonMapper.INSTANCE.get().readValue(PlanJsonMapper.INSTANCE.get().writeValueAsString(command), Command.class);
            return command;
        } catch (JsonProcessingException e) {
            throw new KsqlServerException("Did not write the command to the command topic as it could not be deserialized. This is a bug! Please raise a Github issue containing the series of commands you ran to get to this point." + System.lineSeparator() + e.getMessage());
        }
    }

    private static Command createCommand(ConfiguredStatement<? extends Statement> configuredStatement, ServiceContext serviceContext, KsqlExecutionContext ksqlExecutionContext) {
        return configuredStatement.getUnMaskedStatementText().equals(TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT) ? createForTerminateCluster(configuredStatement) : configuredStatement.getStatement() instanceof PauseQuery ? createForPauseQuery(configuredStatement, ksqlExecutionContext) : configuredStatement.getStatement() instanceof ResumeQuery ? createForResumeQuery(configuredStatement, ksqlExecutionContext) : configuredStatement.getStatement() instanceof TerminateQuery ? createForTerminateQuery(configuredStatement, ksqlExecutionContext) : configuredStatement.getStatement() instanceof AlterSystemProperty ? createForAlterSystemQuery(configuredStatement, ksqlExecutionContext) : createForPlannedQuery(configuredStatement.withConfig(ksqlExecutionContext.getKsqlConfig()), serviceContext, ksqlExecutionContext);
    }

    private static Command createForAlterSystemQuery(ConfiguredStatement<? extends Statement> configuredStatement, KsqlExecutionContext ksqlExecutionContext) {
        AlterSystemProperty statement = configuredStatement.getStatement();
        String propertyName = statement.getPropertyName();
        String propertyValue = statement.getPropertyValue();
        if (!ksqlExecutionContext.getKsqlConfig().getBoolean("ksql.runtime.feature.shared.enabled").booleanValue()) {
            throw new KsqlServerException("Cannot alter system configs when KSQL_SHARED_RUNTIME_ENABLED is turned off.");
        }
        ksqlExecutionContext.alterSystemProperty(propertyName, propertyValue);
        if (!PropertiesList.Property.isEditable(propertyName)) {
            throw new ConfigException(String.format("Failed to set %s to %s. Caused by: Not recognizable as ksql, streams, consumer, or producer property: %s %n", propertyName, propertyValue, propertyName), (Object) null);
        }
        Optional resolve = new KsqlConfigResolver().resolve(propertyName, false);
        if (!resolve.isPresent() || !Objects.equals(((ConfigItem) resolve.get()).getPropertyName(), "processing.guarantee") || ksqlExecutionContext.getPersistentQueries().isEmpty()) {
            return Command.of(configuredStatement);
        }
        Collection collection = (Collection) ksqlExecutionContext.getPersistentQueries().stream().map((v0) -> {
            return v0.getQueryId();
        }).collect(Collectors.toList());
        LOG.error("Failed to set {} to {} due to the {} persistent queries currently running: {}", propertyName, propertyValue, Integer.valueOf(collection.size()), collection);
        throw new ConfigException(String.format("Unable to set %s to %s, as the %s may not be changed for running persistent queries which have already processed data under a different %s. To modify %s you must first terminate all running persistent queries.", propertyName, propertyValue, propertyName, propertyName, propertyName));
    }

    private static Command createForPauseQuery(ConfiguredStatement<? extends Statement> configuredStatement, KsqlExecutionContext ksqlExecutionContext) {
        Optional queryId = configuredStatement.getStatement().getQueryId();
        if (!queryId.isPresent()) {
            ksqlExecutionContext.getPersistentQueries().forEach((v0) -> {
                v0.pause();
            });
            return Command.of(configuredStatement);
        }
        if (((QueryId) queryId.get()).toString().toLowerCase().contains("transient_")) {
            return Command.of(configuredStatement);
        }
        PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) ksqlExecutionContext.getPersistentQuery((QueryId) queryId.get()).orElseThrow(() -> {
            return new KsqlStatementException("Unknown queryId: " + String.valueOf(queryId.get()), configuredStatement.getMaskedStatementText());
        });
        if (persistentQueryMetadata.getPersistentQueryType() == KsqlConstants.PersistentQueryType.CREATE_SOURCE) {
            throw new KsqlStatementException(String.format("Cannot pause query '%s' because it is linked to a source table.", queryId.get()), configuredStatement.getMaskedStatementText());
        }
        persistentQueryMetadata.pause();
        return Command.of(configuredStatement);
    }

    private static Command createForResumeQuery(ConfiguredStatement<? extends Statement> configuredStatement, KsqlExecutionContext ksqlExecutionContext) {
        Optional queryId = configuredStatement.getStatement().getQueryId();
        if (!queryId.isPresent()) {
            ksqlExecutionContext.getPersistentQueries().forEach((v0) -> {
                v0.resume();
            });
            return Command.of(configuredStatement);
        }
        if (((QueryId) queryId.get()).toString().toLowerCase().contains("transient_")) {
            return Command.of(configuredStatement);
        }
        PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) ksqlExecutionContext.getPersistentQuery((QueryId) queryId.get()).orElseThrow(() -> {
            return new KsqlStatementException("Unknown queryId: " + String.valueOf(queryId.get()), configuredStatement.getMaskedStatementText());
        });
        if (persistentQueryMetadata.getPersistentQueryType() == KsqlConstants.PersistentQueryType.CREATE_SOURCE) {
            throw new KsqlStatementException(String.format("Cannot resume query '%s' because it is linked to a source table.", queryId.get()), configuredStatement.getMaskedStatementText());
        }
        persistentQueryMetadata.resume();
        return Command.of(configuredStatement);
    }

    private static Command createForTerminateQuery(ConfiguredStatement<? extends Statement> configuredStatement, KsqlExecutionContext ksqlExecutionContext) {
        Optional queryId = configuredStatement.getStatement().getQueryId();
        if (!queryId.isPresent()) {
            ksqlExecutionContext.getPersistentQueries().forEach((v0) -> {
                v0.close();
            });
            return Command.of(configuredStatement);
        }
        if (((QueryId) queryId.get()).toString().toLowerCase().contains("transient_")) {
            return Command.of(configuredStatement);
        }
        PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) ksqlExecutionContext.getPersistentQuery((QueryId) queryId.get()).orElseThrow(() -> {
            return new KsqlStatementException("Unknown queryId: " + String.valueOf(queryId.get()), configuredStatement.getMaskedStatementText());
        });
        if (persistentQueryMetadata.getPersistentQueryType() == KsqlConstants.PersistentQueryType.CREATE_SOURCE) {
            throw new KsqlStatementException(String.format("Cannot terminate query '%s' because it is linked to a source table.", queryId.get()), configuredStatement.getMaskedStatementText());
        }
        persistentQueryMetadata.close();
        return Command.of(configuredStatement);
    }

    public static Command createForTerminateCluster(ConfiguredStatement configuredStatement) {
        return Command.of((ConfiguredStatement<?>) configuredStatement);
    }

    private static Command createForPlannedQuery(ConfiguredStatement<? extends Statement> configuredStatement, ServiceContext serviceContext, KsqlExecutionContext ksqlExecutionContext) {
        KsqlPlan plan = ksqlExecutionContext.plan(serviceContext, configuredStatement);
        ConfiguredKsqlPlan of = ConfiguredKsqlPlan.of(plan, configuredStatement.getSessionConfig());
        KsqlExecutionContext.ExecuteResult execute = ksqlExecutionContext.execute(serviceContext, of);
        if (execute.getQuery().isPresent() && (execute.getQuery().get() instanceof PersistentQueryMetadataImpl) && of.getConfig().getConfig(false).getBoolean("ksql.runtime.feature.shared.enabled").booleanValue()) {
            of = ConfiguredKsqlPlan.of(plan, configuredStatement.getSessionConfig().copyWith(ImmutableMap.of("ksql.runtime.feature.shared.enabled", false)));
        }
        return Command.of(of);
    }
}
