package io.confluent.ksql.query;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.KStreamHolder;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.streams.KSPlanBuilder;
import io.confluent.ksql.execution.streams.materialization.KsqlMaterializationFactory;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.execution.streams.materialization.ks.KsMaterializationFactory;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metrics.ConsumerCollector;
import io.confluent.ksql.metrics.ProducerCollector;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:io/confluent/ksql/query/QueryExecutor.class */
public final class QueryExecutor {
    private final KsqlConfig ksqlConfig;
    private final Map<String, Object> overrides;
    private final ProcessingLogContext processingLogContext;
    private final ServiceContext serviceContext;
    private final FunctionRegistry functionRegistry;
    private final KafkaStreamsBuilder kafkaStreamsBuilder;
    private final Consumer<QueryMetadata> queryCloseCallback;
    private final KsMaterializationFactory ksMaterializationFactory;
    private final KsqlMaterializationFactory ksqlMaterializationFactory;
    private final StreamsBuilder streamsBuilder;

    public QueryExecutor(KsqlConfig ksqlConfig, Map<String, Object> map, ProcessingLogContext processingLogContext, ServiceContext serviceContext, FunctionRegistry functionRegistry, Consumer<QueryMetadata> consumer) {
        this(ksqlConfig, map, processingLogContext, serviceContext, functionRegistry, consumer, new KafkaStreamsBuilderImpl(((ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext")).getKafkaClientSupplier()), new StreamsBuilder(), new KsqlMaterializationFactory(processingLogContext), new KsMaterializationFactory());
    }

    QueryExecutor(KsqlConfig ksqlConfig, Map<String, Object> map, ProcessingLogContext processingLogContext, ServiceContext serviceContext, FunctionRegistry functionRegistry, Consumer<QueryMetadata> consumer, KafkaStreamsBuilder kafkaStreamsBuilder, StreamsBuilder streamsBuilder, KsqlMaterializationFactory ksqlMaterializationFactory, KsMaterializationFactory ksMaterializationFactory) {
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.overrides = (Map) Objects.requireNonNull(map, "overrides");
        this.processingLogContext = (ProcessingLogContext) Objects.requireNonNull(processingLogContext, "processingLogContext");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.functionRegistry = (FunctionRegistry) Objects.requireNonNull(functionRegistry, "functionRegistry");
        this.queryCloseCallback = (Consumer) Objects.requireNonNull(consumer, "queryCloseCallback");
        this.ksMaterializationFactory = (KsMaterializationFactory) Objects.requireNonNull(ksMaterializationFactory, "ksMaterializationFactory");
        this.ksqlMaterializationFactory = (KsqlMaterializationFactory) Objects.requireNonNull(ksqlMaterializationFactory, "ksqlMaterializationFactory");
        this.kafkaStreamsBuilder = (KafkaStreamsBuilder) Objects.requireNonNull(kafkaStreamsBuilder);
        this.streamsBuilder = (StreamsBuilder) Objects.requireNonNull(streamsBuilder, "builder");
    }

    public TransientQueryMetadata buildTransientQuery(String str, QueryId queryId, Set<SourceName> set, ExecutionStep<?> executionStep, String str2, LogicalSchema logicalSchema, OptionalInt optionalInt) {
        TransientQueryQueue buildTransientQueryQueue = buildTransientQueryQueue(queryId, executionStep, optionalInt);
        String addTimeSuffix = addTimeSuffix(getQueryApplicationId(getServiceId(), this.ksqlConfig.getString("ksql.transient.prefix"), queryId));
        Map<String, Object> buildStreamsProperties = buildStreamsProperties(addTimeSuffix, queryId);
        KafkaStreamsBuilder.BuildResult buildKafkaStreams = this.kafkaStreamsBuilder.buildKafkaStreams(this.streamsBuilder, buildStreamsProperties);
        return new TransientQueryMetadata(str, buildKafkaStreams.kafkaStreams, buildTransientQuerySchema(logicalSchema), set, str2, buildTransientQueryQueue, addTimeSuffix, buildKafkaStreams.topology, buildStreamsProperties, this.overrides, this.queryCloseCallback, this.ksqlConfig.getLong("ksql.streams.shutdown.timeout.ms").longValue());
    }

    public QueryMetadata buildTransientQuery(String str, QueryId queryId, Set<SourceName> set, ExecutionStep<?> executionStep, String str2, LogicalSchema logicalSchema, Consumer<GenericRow> consumer) {
        KStream stream;
        Object build = executionStep.build(new KSPlanBuilder(queryBuilder(queryId)));
        if (build instanceof KStreamHolder) {
            stream = ((KStreamHolder) build).getStream();
        } else {
            if (!(build instanceof KTableHolder)) {
                throw new IllegalStateException("Unexpected type built from exection plan");
            }
            stream = ((KTableHolder) build).getTable().toStream();
        }
        stream.foreach((obj, genericRow) -> {
            if (genericRow == null) {
                return;
            }
            consumer.accept(genericRow);
        });
        String addTimeSuffix = addTimeSuffix(getQueryApplicationId(getServiceId(), this.ksqlConfig.getString("ksql.transient.prefix"), queryId));
        Map<String, Object> buildStreamsProperties = buildStreamsProperties(addTimeSuffix, queryId);
        KafkaStreamsBuilder.BuildResult buildKafkaStreams = this.kafkaStreamsBuilder.buildKafkaStreams(this.streamsBuilder, buildStreamsProperties);
        return new QueryMetadata(str, buildKafkaStreams.kafkaStreams, buildTransientQuerySchema(logicalSchema), set, str2, addTimeSuffix, buildKafkaStreams.topology, buildStreamsProperties, this.overrides, this.queryCloseCallback, this.ksqlConfig.getLong("ksql.streams.shutdown.timeout.ms").longValue()) { // from class: io.confluent.ksql.query.QueryExecutor.1
            @Override // io.confluent.ksql.util.QueryMetadata
            public void stop() {
                close();
            }
        };
    }

    private static Optional<MaterializationInfo> getMaterializationInfo(Object obj) {
        return obj instanceof KTableHolder ? ((KTableHolder) obj).getMaterializationBuilder().map((v0) -> {
            return v0.build();
        }) : Optional.empty();
    }

    public PersistentQueryMetadata buildQuery(String str, QueryId queryId, DataSource dataSource, Set<SourceName> set, ExecutionStep<?> executionStep, String str2) {
        KsqlQueryBuilder queryBuilder = queryBuilder(queryId);
        Object build = executionStep.build(new KSPlanBuilder(queryBuilder));
        String queryApplicationId = getQueryApplicationId(getServiceId(), this.ksqlConfig.getString("ksql.persistent.prefix"), queryId);
        Map<String, Object> buildStreamsProperties = buildStreamsProperties(queryApplicationId, queryId);
        KafkaStreamsBuilder.BuildResult buildKafkaStreams = this.kafkaStreamsBuilder.buildKafkaStreams(this.streamsBuilder, buildStreamsProperties);
        PhysicalSchema from = PhysicalSchema.from(dataSource.getSchema(), dataSource.getSerdeOptions());
        return new PersistentQueryMetadata(str, buildKafkaStreams.kafkaStreams, from, set, dataSource.getName(), str2, queryId, dataSource.getDataSourceType(), getMaterializationInfo(build).flatMap(materializationInfo -> {
            return buildMaterializationProvider(materializationInfo, buildKafkaStreams.kafkaStreams, from, dataSource.getKsqlTopic().getKeyFormat(), buildStreamsProperties, queryApplicationId);
        }), queryApplicationId, dataSource.getKsqlTopic(), buildKafkaStreams.topology, queryBuilder.getSchemas(), buildStreamsProperties, this.overrides, this.queryCloseCallback, this.ksqlConfig.getLong("ksql.streams.shutdown.timeout.ms").longValue());
    }

    private TransientQueryQueue buildTransientQueryQueue(QueryId queryId, ExecutionStep<?> executionStep, OptionalInt optionalInt) {
        KStream stream;
        Object build = executionStep.build(new KSPlanBuilder(queryBuilder(queryId)));
        if (build instanceof KStreamHolder) {
            stream = ((KStreamHolder) build).getStream();
        } else {
            if (!(build instanceof KTableHolder)) {
                throw new IllegalStateException("Unexpected type built from exection plan");
            }
            stream = ((KTableHolder) build).getTable().toStream();
        }
        return new TransientQueryQueue(stream, optionalInt);
    }

    private KsqlQueryBuilder queryBuilder(QueryId queryId) {
        return KsqlQueryBuilder.of(this.streamsBuilder, this.ksqlConfig, this.serviceContext, this.processingLogContext, this.functionRegistry, queryId);
    }

    private String getServiceId() {
        return "_confluent-ksql-" + this.ksqlConfig.getString("ksql.service.id");
    }

    private Map<String, Object> buildStreamsProperties(String str, QueryId queryId) {
        HashMap hashMap = new HashMap(this.ksqlConfig.getKsqlStreamConfigProps());
        hashMap.put("application.id", str);
        hashMap.put("ksql.logger.production.error", this.processingLogContext.getLoggerFactory().getLogger(queryId.toString()));
        updateListProperty(hashMap, StreamsConfig.consumerPrefix("interceptor.classes"), ConsumerCollector.class.getCanonicalName());
        updateListProperty(hashMap, StreamsConfig.producerPrefix("interceptor.classes"), ProducerCollector.class.getCanonicalName());
        return hashMap;
    }

    private static String getQueryApplicationId(String str, String str2, QueryId queryId) {
        return str + str2 + queryId;
    }

    private static void updateListProperty(Map<String, Object> map, String str, Object obj) {
        LinkedList linkedList;
        Object orDefault = map.getOrDefault(str, new LinkedList());
        if (orDefault instanceof String) {
            linkedList = new LinkedList(Arrays.asList(((String) orDefault).split("\\s*,\\s*")));
        } else {
            if (!(orDefault instanceof List)) {
                throw new KsqlException("Expecting list or string for property: " + str);
            }
            linkedList = new LinkedList((List) orDefault);
        }
        linkedList.add(obj);
        map.put(str, linkedList);
    }

    private static String addTimeSuffix(String str) {
        return String.format("%s_%d", str, Long.valueOf(System.currentTimeMillis()));
    }

    private Optional<MaterializationProvider> buildMaterializationProvider(MaterializationInfo materializationInfo, KafkaStreams kafkaStreams, PhysicalSchema physicalSchema, KeyFormat keyFormat, Map<String, Object> map, String str) {
        return this.ksMaterializationFactory.create(materializationInfo.stateStoreName(), kafkaStreams, materializationInfo.getStateStoreSchema(), new GenericKeySerDe().create(keyFormat.getFormatInfo(), physicalSchema.keySchema(), this.ksqlConfig, this.serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE).serializer(), keyFormat.getWindowInfo(), map, this.ksqlConfig, str).map(ksMaterialization -> {
            return (queryId, stacker) -> {
                return this.ksqlMaterializationFactory.create(ksMaterialization, materializationInfo, queryId, stacker);
            };
        });
    }

    private static LogicalSchema buildTransientQuerySchema(LogicalSchema logicalSchema) {
        LogicalSchema.Builder noImplicitColumns = LogicalSchema.builder().noImplicitColumns();
        noImplicitColumns.valueColumns(logicalSchema.value());
        return noImplicitColumns.build();
    }
}
