package io.confluent.ksql.rest.server.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.connect.supported.Connectors;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.rest.entity.ConfigInfos;
import io.confluent.ksql.rest.entity.ConnectorInfo;
import io.confluent.ksql.rest.entity.ConnectorType;
import io.confluent.ksql.rest.entity.CreateConnectorEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.WarningEntity;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.services.ConnectClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:io/confluent/ksql/rest/server/execution/ConnectExecutor.class */
public final class ConnectExecutor {
    private static final ConnectorInfo DUMMY_CREATE_RESPONSE = new ConnectorInfo("dummy", ImmutableMap.of(), ImmutableList.of(), ConnectorType.UNKNOWN);

    private ConnectExecutor() {
    }

    public static StatementExecutorResponse execute(ConfiguredStatement<CreateConnector> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        CreateConnector statement = configuredStatement.getStatement();
        ConnectClient connectClient = serviceContext.getConnectClient();
        Optional<KsqlEntity> handleIfNotExists = handleIfNotExists(configuredStatement, statement, connectClient);
        if (handleIfNotExists.isPresent()) {
            return StatementExecutorResponse.handled(handleIfNotExists);
        }
        ConnectClient.ConnectResponse create = connectClient.create(statement.getName(), buildConnectorConfig(statement));
        if (create.datum().isPresent()) {
            return StatementExecutorResponse.handled(Optional.of(new CreateConnectorEntity(configuredStatement.getMaskedStatementText(), (ConnectorInfo) create.datum().get())));
        }
        if (!create.error().isPresent()) {
            throw new IllegalStateException("Either response.datum() or response.error() must be present");
        }
        throw new KsqlRestException(EndpointResponse.create().status(create.httpCode()).entity(new KsqlErrorMessage(Errors.toErrorCode(create.httpCode()), "Failed to create connector: " + ((String) create.error().get()))).build());
    }

    public static StatementExecutorResponse validate(ConfiguredStatement<CreateConnector> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        CreateConnector statement = configuredStatement.getStatement();
        ConnectClient connectClient = serviceContext.getConnectClient();
        if (checkForExistingConnector(configuredStatement, statement, connectClient)) {
            throw new KsqlRestException(EndpointResponse.create().status(409).entity(new KsqlErrorMessage(Errors.toErrorCode(409), String.format("Connector %s already exists", statement.getName()))).build());
        }
        List<String> validateConfigs = validateConfigs(statement, connectClient);
        if (validateConfigs.isEmpty()) {
            return StatementExecutorResponse.handled(Optional.of(new CreateConnectorEntity(configuredStatement.getMaskedStatementText(), DUMMY_CREATE_RESPONSE)));
        }
        throw new KsqlException("Validation error: " + String.join("\n", validateConfigs));
    }

    private static List<String> validateConfigs(CreateConnector createConnector, ConnectClient connectClient) {
        Map<String, String> buildConnectorConfig = buildConnectorConfig(createConnector);
        String str = buildConnectorConfig.get("connector.class");
        if (str == null) {
            return ImmutableList.of(String.format("Connector config %s contains no connector type", buildConnectorConfig));
        }
        if (str.trim().isEmpty()) {
            return ImmutableList.of("Connector type cannot be empty");
        }
        ConnectClient.ConnectResponse validate = connectClient.validate(str, buildConnectorConfig);
        return validate.error().isPresent() ? ImmutableList.of((String) validate.error().get()) : validate.datum().isPresent() ? (List) ((ConfigInfos) validate.datum().get()).values().stream().filter(configInfo -> {
            return !configInfo.configValue().errors().isEmpty();
        }).map(configInfo2 -> {
            return configInfo2.configValue().name() + " - " + String.join(". ", configInfo2.configValue().errors());
        }).collect(Collectors.toList()) : ImmutableList.of();
    }

    private static Map<String, String> buildConnectorConfig(CreateConnector createConnector) {
        Map<String, String> resolve = Connectors.resolve(Maps.transformValues(createConnector.getConfig(), literal -> {
            if (literal != null) {
                return literal.getValue().toString();
            }
            return null;
        }));
        resolve.put("name", createConnector.getName());
        return resolve;
    }

    private static boolean checkForExistingConnector(ConfiguredStatement<CreateConnector> configuredStatement, CreateConnector createConnector, ConnectClient connectClient) {
        if (createConnector.ifNotExists()) {
            return false;
        }
        ConnectClient.ConnectResponse connectors = connectClient.connectors();
        if (connectors.error().isPresent()) {
            throw new KsqlServerException("Failed to check for existing connector: " + ((String) connectors.error().get()));
        }
        return connectorExists(createConnector, connectors);
    }

    private static Optional<KsqlEntity> handleIfNotExists(ConfiguredStatement<CreateConnector> configuredStatement, CreateConnector createConnector, ConnectClient connectClient) {
        if (createConnector.ifNotExists()) {
            ConnectClient.ConnectResponse connectors = connectClient.connectors();
            if (connectors.error().isPresent()) {
                throw new KsqlServerException("Failed to check for existing connector: " + ((String) connectors.error().get()));
            }
            if (connectorExists(createConnector, connectors)) {
                return Optional.of(new WarningEntity(configuredStatement.getMaskedStatementText(), String.format("Connector %s already exists", createConnector.getName())));
            }
        }
        return Optional.empty();
    }

    private static boolean connectorExists(CreateConnector createConnector, ConnectClient.ConnectResponse<List<String>> connectResponse) {
        return ((List) connectResponse.datum().get()).stream().filter(str -> {
            return StringUtils.equalsIgnoreCase(createConnector.getName(), str);
        }).findAny().isPresent();
    }
}
