package io.confluent.ksql.api.plugin;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.impl.Utils;
import io.confluent.ksql.api.server.BaseServerEndpoints;
import io.confluent.ksql.api.server.PushQueryHandler;
import io.confluent.ksql.api.spi.InsertsSubscriber;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.QueryMetadata;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.JsonObject;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:io/confluent/ksql/api/plugin/KsqlServerEndpoints.class */
public class KsqlServerEndpoints extends BaseServerEndpoints {
    private final KsqlEngine ksqlEngine;
    private final KsqlConfig ksqlConfig;
    private final KsqlSecurityExtension securityExtension;
    private final ServiceContextFactory theServiceContextFactory;

    /* loaded from: input_file:io/confluent/ksql/api/plugin/KsqlServerEndpoints$DummyPrincipal.class */
    private static class DummyPrincipal implements Principal {
        private DummyPrincipal() {
        }

        @Override // java.security.Principal
        public String getName() {
            return "NO_PRINCIPAL";
        }
    }

    /* loaded from: input_file:io/confluent/ksql/api/plugin/KsqlServerEndpoints$KsqlQueryHandle.class */
    private static class KsqlQueryHandle implements PushQueryHandler {
        private final QueryMetadata queryMetadata;
        private final OptionalInt limit;

        KsqlQueryHandle(QueryMetadata queryMetadata, OptionalInt optionalInt) {
            this.queryMetadata = queryMetadata;
            this.limit = optionalInt;
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandler
        public List<String> getColumnNames() {
            return colNamesFromSchema(this.queryMetadata.getLogicalSchema());
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandler
        public List<String> getColumnTypes() {
            return colTypesFromSchema(this.queryMetadata.getLogicalSchema());
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandler
        public OptionalInt getLimit() {
            return this.limit;
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandler
        public void start() {
            this.queryMetadata.start();
        }

        @Override // io.confluent.ksql.api.server.PushQueryHandler
        public void stop() {
            this.queryMetadata.close();
        }

        private static List<String> colTypesFromSchema(LogicalSchema logicalSchema) {
            List value = logicalSchema.value();
            ArrayList arrayList = new ArrayList(value.size());
            Iterator it = value.iterator();
            while (it.hasNext()) {
                arrayList.add(((Column) it.next()).type().toString(FormatOptions.none()));
            }
            return arrayList;
        }

        private static List<String> colNamesFromSchema(LogicalSchema logicalSchema) {
            List value = logicalSchema.value();
            ArrayList arrayList = new ArrayList(value.size());
            Iterator it = value.iterator();
            while (it.hasNext()) {
                arrayList.add(((Column) it.next()).name().name());
            }
            return arrayList;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/api/plugin/KsqlServerEndpoints$ServiceContextFactory.class */
    public interface ServiceContextFactory {
        ServiceContext create(KsqlConfig ksqlConfig, Optional<String> optional, KafkaClientSupplier kafkaClientSupplier, Supplier<SchemaRegistryClient> supplier);
    }

    public KsqlServerEndpoints(KsqlEngine ksqlEngine, KsqlConfig ksqlConfig, KsqlSecurityExtension ksqlSecurityExtension, ServiceContextFactory serviceContextFactory) {
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine);
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig);
        this.securityExtension = (KsqlSecurityExtension) Objects.requireNonNull(ksqlSecurityExtension);
        this.theServiceContextFactory = (ServiceContextFactory) Objects.requireNonNull(serviceContextFactory);
    }

    @Override // io.confluent.ksql.api.server.BaseServerEndpoints
    protected PushQueryHandler createQuery(String str, JsonObject jsonObject, Context context, WorkerExecutor workerExecutor, Consumer<GenericRow> consumer) {
        Utils.checkIsWorker();
        ServiceContext createServiceContext = createServiceContext(new DummyPrincipal());
        ConfiguredStatement<Query> createStatement = createStatement(str, jsonObject.getMap());
        return new KsqlQueryHandle(this.ksqlEngine.executeQuery(createServiceContext, createStatement, consumer), createStatement.getStatement().getLimit());
    }

    private ConfiguredStatement<Query> createStatement(String str, Map<String, Object> map) {
        List parse = this.ksqlEngine.parse(str);
        if (parse.size() != 1) {
            throw new KsqlStatementException(String.format("Expected exactly one KSQL statement; found %d instead", Integer.valueOf(parse.size())), str);
        }
        KsqlParser.PreparedStatement prepare = this.ksqlEngine.prepare((KsqlParser.ParsedStatement) parse.get(0));
        if (prepare.getStatement() instanceof Query) {
            return ConfiguredStatement.of(prepare, map, this.ksqlConfig);
        }
        throw new KsqlStatementException("Not a query", str);
    }

    private ServiceContext createServiceContext(Principal principal) {
        if (this.securityExtension.getUserContextProvider().isPresent()) {
            return (ServiceContext) this.securityExtension.getUserContextProvider().map(ksqlUserContextProvider -> {
                return createServiceContext(ksqlUserContextProvider.getKafkaClientSupplier(principal), ksqlUserContextProvider.getSchemaRegistryClientFactory(principal));
            }).get();
        }
        DefaultKafkaClientSupplier defaultKafkaClientSupplier = new DefaultKafkaClientSupplier();
        KsqlSchemaRegistryClientFactory ksqlSchemaRegistryClientFactory = new KsqlSchemaRegistryClientFactory(this.ksqlConfig, Collections.emptyMap());
        ksqlSchemaRegistryClientFactory.getClass();
        return createServiceContext(defaultKafkaClientSupplier, ksqlSchemaRegistryClientFactory::get);
    }

    private ServiceContext createServiceContext(KafkaClientSupplier kafkaClientSupplier, Supplier<SchemaRegistryClient> supplier) {
        return this.theServiceContextFactory.create(this.ksqlConfig, Optional.empty(), kafkaClientSupplier, supplier);
    }

    @Override // io.confluent.ksql.api.server.BaseServerEndpoints, io.confluent.ksql.api.spi.Endpoints
    public InsertsSubscriber createInsertsSubscriber(String str, JsonObject jsonObject, Subscriber<JsonObject> subscriber) {
        return null;
    }
}
