package io.confluent.ksql.query;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.KStreamHolder;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.runtime.RuntimeBuildContext;
import io.confluent.ksql.execution.streams.KSPlanBuilder;
import io.confluent.ksql.execution.streams.materialization.KsqlMaterializationFactory;
import io.confluent.ksql.execution.streams.materialization.ks.KsMaterializationFactory;
import io.confluent.ksql.execution.streams.metrics.RocksDBMetricsCollector;
import io.confluent.ksql.execution.util.KeyUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metrics.ConsumerCollector;
import io.confluent.ksql.metrics.ProducerCollector;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.physical.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.PersistentQueryMetadataImpl;
import io.confluent.ksql.util.PushQueryMetadata;
import io.confluent.ksql.util.QueryApplicationId;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/ksql/query/QueryExecutor.class */
public final class QueryExecutor {
    private static final String KSQL_THREAD_EXCEPTION_UNCAUGHT_LOGGER = "ksql.logger.thread.exception.uncaught";
    private final SessionConfig config;
    private final ProcessingLogContext processingLogContext;
    private final ServiceContext serviceContext;
    private final FunctionRegistry functionRegistry;
    private final KafkaStreamsBuilder kafkaStreamsBuilder;
    private final StreamsBuilder streamsBuilder;
    private final MaterializationProviderBuilderFactory materializationProviderBuilderFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryExecutor(SessionConfig sessionConfig, ProcessingLogContext processingLogContext, ServiceContext serviceContext, FunctionRegistry functionRegistry) {
        this(sessionConfig, processingLogContext, serviceContext, functionRegistry, new KafkaStreamsBuilderImpl(((ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext")).getKafkaClientSupplier()), new StreamsBuilder(), new MaterializationProviderBuilderFactory(sessionConfig.getConfig(true), serviceContext, new KsMaterializationFactory(), new KsqlMaterializationFactory(processingLogContext)));
    }

    @VisibleForTesting
    QueryExecutor(SessionConfig sessionConfig, ProcessingLogContext processingLogContext, ServiceContext serviceContext, FunctionRegistry functionRegistry, KafkaStreamsBuilder kafkaStreamsBuilder, StreamsBuilder streamsBuilder, MaterializationProviderBuilderFactory materializationProviderBuilderFactory) {
        this.config = (SessionConfig) Objects.requireNonNull(sessionConfig, "config");
        this.processingLogContext = (ProcessingLogContext) Objects.requireNonNull(processingLogContext, "processingLogContext");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.functionRegistry = (FunctionRegistry) Objects.requireNonNull(functionRegistry, "functionRegistry");
        this.kafkaStreamsBuilder = (KafkaStreamsBuilder) Objects.requireNonNull(kafkaStreamsBuilder, "kafkaStreamsBuilder");
        this.streamsBuilder = (StreamsBuilder) Objects.requireNonNull(streamsBuilder, "streamsBuilder");
        this.materializationProviderBuilderFactory = (MaterializationProviderBuilderFactory) Objects.requireNonNull(materializationProviderBuilderFactory, "materializationProviderBuilderFactory");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransientQueryMetadata buildTransientQuery(String str, QueryId queryId, Set<SourceName> set, ExecutionStep<?> executionStep, String str2, LogicalSchema logicalSchema, OptionalInt optionalInt, Optional<WindowInfo> optional, boolean z, QueryMetadata.Listener listener) {
        KsqlConfig config = this.config.getConfig(true);
        String build = QueryApplicationId.build(config, false, queryId);
        RuntimeBuildContext buildContext = buildContext(build, queryId);
        Map<String, Object> buildStreamsProperties = buildStreamsProperties(build, queryId);
        Object buildQueryImplementation = buildQueryImplementation(executionStep, buildContext);
        return new TransientQueryMetadata(str, logicalSchema, set, str2, buildTransientQueryQueue(buildQueryImplementation, optionalInt, z), queryId, build, this.streamsBuilder.build(PropertiesUtil.asProperties(buildStreamsProperties)), this.kafkaStreamsBuilder, buildStreamsProperties, this.config.getOverrides(), config.getLong("ksql.streams.shutdown.timeout.ms").longValue(), config.getInt("ksql.query.error.max.queue.size").intValue(), buildQueryImplementation instanceof KTableHolder ? optional.isPresent() ? PushQueryMetadata.ResultType.WINDOWED_TABLE : PushQueryMetadata.ResultType.TABLE : PushQueryMetadata.ResultType.STREAM, config.getLong("ksql.query.retry.backoff.initial.ms").longValue(), config.getLong("ksql.query.retry.backoff.max.ms").longValue(), listener);
    }

    private static Optional<MaterializationInfo> getMaterializationInfo(Object obj) {
        return obj instanceof KTableHolder ? ((KTableHolder) obj).getMaterializationBuilder().map((v0) -> {
            return v0.build();
        }) : Optional.empty();
    }

    private static Optional<ScalablePushRegistry> applyScalablePushProcessor(LogicalSchema logicalSchema, Object obj, Supplier<List<PersistentQueryMetadata>> supplier, boolean z, Map<String, Object> map, KsqlConfig ksqlConfig) {
        KStream stream;
        boolean z2;
        if (!ksqlConfig.getBoolean("ksql.query.push.scalable.enabled").booleanValue()) {
            return Optional.empty();
        }
        if (obj instanceof KTableHolder) {
            stream = ((KTableHolder) obj).getTable().toStream();
            z2 = true;
        } else {
            stream = ((KStreamHolder) obj).getStream();
            z2 = false;
        }
        Optional<ScalablePushRegistry> create = ScalablePushRegistry.create(logicalSchema, supplier, z2, z, map);
        KStream kStream = stream;
        create.ifPresent(scalablePushRegistry -> {
            kStream.process((ProcessorSupplier) create.get(), new String[0]);
        });
        return create;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentQueryMetadata buildPersistentQuery(KsqlConstants.PersistentQueryType persistentQueryType, String str, QueryId queryId, DataSource dataSource, Set<SourceName> set, ExecutionStep<?> executionStep, String str2, QueryMetadata.Listener listener, Supplier<List<PersistentQueryMetadata>> supplier) {
        KsqlConfig config = this.config.getConfig(true);
        String build = QueryApplicationId.build(config, true, queryId);
        Map<String, Object> buildStreamsProperties = buildStreamsProperties(build, queryId);
        PhysicalSchema from = PhysicalSchema.from(dataSource.getSchema(), dataSource.getKsqlTopic().getKeyFormat().getFeatures(), dataSource.getKsqlTopic().getValueFormat().getFeatures());
        RuntimeBuildContext buildContext = buildContext(build, queryId);
        Object buildQueryImplementation = buildQueryImplementation(executionStep, buildContext);
        Optional<ScalablePushRegistry> applyScalablePushProcessor = applyScalablePushProcessor(from.logicalSchema(), buildQueryImplementation, supplier, dataSource.getKsqlTopic().getKeyFormat().isWindowed(), buildStreamsProperties, config);
        Topology build2 = this.streamsBuilder.build(PropertiesUtil.asProperties(buildStreamsProperties));
        Optional<U> map = getMaterializationInfo(buildQueryImplementation).map(materializationInfo -> {
            return this.materializationProviderBuilderFactory.materializationProviderBuilder(materializationInfo, from, dataSource.getKsqlTopic().getKeyFormat(), buildStreamsProperties, build);
        });
        QueryErrorClassifier and = new MissingTopicClassifier(build).and(new AuthorizationClassifier(build));
        Optional<QueryErrorClassifier> buildConfiguredClassifiers = buildConfiguredClassifiers(config, build);
        and.getClass();
        return new PersistentQueryMetadataImpl(persistentQueryType, str, from, set, dataSource, str2, queryId, map, build, build2, this.kafkaStreamsBuilder, buildContext.getSchemas(), buildStreamsProperties, this.config.getOverrides(), config.getLong("ksql.streams.shutdown.timeout.ms").longValue(), (QueryErrorClassifier) buildConfiguredClassifiers.map(and::and).orElse(and), executionStep, config.getInt("ksql.query.error.max.queue.size").intValue(), getUncaughtExceptionProcessingLogger(queryId), config.getLong("ksql.query.retry.backoff.initial.ms").longValue(), config.getLong("ksql.query.retry.backoff.max.ms").longValue(), listener, applyScalablePushProcessor);
    }

    private ProcessingLogger getUncaughtExceptionProcessingLogger(QueryId queryId) {
        return this.processingLogContext.getLoggerFactory().getLogger(QueryLoggerUtil.queryLoggerName(queryId, new QueryContext.Stacker().push(new String[]{KSQL_THREAD_EXCEPTION_UNCAUGHT_LOGGER}).getQueryContext()));
    }

    private static TransientQueryQueue buildTransientQueryQueue(Object obj, OptionalInt optionalInt, boolean z) {
        TransientQueryQueue transientQueryQueue = new TransientQueryQueue(optionalInt);
        if (obj instanceof KStreamHolder) {
            ((KStreamHolder) obj).getStream().filter((obj2, genericRow) -> {
                return genericRow != null;
            }).foreach((obj3, genericRow2) -> {
                transientQueryQueue.acceptRow(null, genericRow2);
            });
        } else {
            if (!(obj instanceof KTableHolder)) {
                throw new IllegalStateException("Unexpected type built from execution plan");
            }
            KStream stream = ((KTableHolder) obj).getTable().toStream();
            (z ? stream.filter((obj4, genericRow3) -> {
                return genericRow3 != null;
            }) : stream).foreach((obj5, genericRow4) -> {
                transientQueryQueue.acceptRow(KeyUtil.asList(obj5), genericRow4);
            });
        }
        return transientQueryQueue;
    }

    private static Object buildQueryImplementation(ExecutionStep<?> executionStep, RuntimeBuildContext runtimeBuildContext) {
        return executionStep.build(new KSPlanBuilder(runtimeBuildContext));
    }

    private RuntimeBuildContext buildContext(String str, QueryId queryId) {
        return RuntimeBuildContext.of(this.streamsBuilder, this.config.getConfig(true), this.serviceContext, this.processingLogContext, this.functionRegistry, str, queryId);
    }

    private Map<String, Object> buildStreamsProperties(String str, QueryId queryId) {
        HashMap hashMap = new HashMap(this.config.getConfig(true).getKsqlStreamConfigProps(str));
        hashMap.put("application.id", str);
        hashMap.put("ksql.logger.production.error", this.processingLogContext.getLoggerFactory().getLogger(queryId.toString()));
        updateListProperty(hashMap, StreamsConfig.consumerPrefix("interceptor.classes"), ConsumerCollector.class.getCanonicalName());
        updateListProperty(hashMap, StreamsConfig.producerPrefix("interceptor.classes"), ProducerCollector.class.getCanonicalName());
        updateListProperty(hashMap, "metric.reporters", RocksDBMetricsCollector.class.getName());
        return hashMap;
    }

    private static Optional<QueryErrorClassifier> buildConfiguredClassifiers(KsqlConfig ksqlConfig, String str) {
        Map originalsWithPrefix = ksqlConfig.originalsWithPrefix("ksql.error.classifier.regex");
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator it = originalsWithPrefix.values().iterator();
        while (it.hasNext()) {
            builder.add(RegexClassifier.fromConfig((String) it.next(), str));
        }
        ImmutableList build = builder.build();
        if (build.isEmpty()) {
            return Optional.empty();
        }
        QueryErrorClassifier queryErrorClassifier = (QueryErrorClassifier) Iterables.get(build, 0);
        Iterator it2 = Iterables.skip(build, 1).iterator();
        while (it2.hasNext()) {
            queryErrorClassifier = queryErrorClassifier.and((QueryErrorClassifier) it2.next());
        }
        return Optional.ofNullable(queryErrorClassifier);
    }

    private static void updateListProperty(Map<String, Object> map, String str, Object obj) {
        LinkedList linkedList;
        Object orDefault = map.getOrDefault(str, new LinkedList());
        if (orDefault instanceof String) {
            linkedList = new LinkedList(Arrays.asList(((String) orDefault).split("\\s*,\\s*")));
        } else {
            if (!(orDefault instanceof List)) {
                throw new KsqlException("Expecting list or string for property: " + str);
            }
            linkedList = new LinkedList((List) orDefault);
        }
        linkedList.add(obj);
        map.put(str, linkedList);
    }
}
