package io.confluent.ksql.util;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.physical.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
import io.confluent.ksql.query.QueryErrorClassifier;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.query.QuerySchemas;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.QueryMetadata;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

/* loaded from: input_file:io/confluent/ksql/util/PersistentQueryMetadataImpl.class */
public class PersistentQueryMetadataImpl extends QueryMetadataImpl implements PersistentQueryMetadata {
    private final KsqlConstants.PersistentQueryType persistentQueryType;
    private final DataSource sinkDataSource;
    private final QuerySchemas schemas;
    private final PhysicalSchema resultSchema;
    private final ExecutionStep<?> physicalPlan;
    private final Optional<MaterializationProviderBuilderFactory.MaterializationProviderBuilder> materializationProviderBuilder;
    private final Optional<ScalablePushRegistry> scalablePushRegistry;
    private final ProcessingLogger processingLogger;
    private Optional<MaterializationProvider> materializationProvider;
    private final ScheduledExecutorService executorService;

    public PersistentQueryMetadataImpl(KsqlConstants.PersistentQueryType persistentQueryType, String str, PhysicalSchema physicalSchema, Set<SourceName> set, DataSource dataSource, String str2, QueryId queryId, Optional<MaterializationProviderBuilderFactory.MaterializationProviderBuilder> optional, String str3, Topology topology, KafkaStreamsBuilder kafkaStreamsBuilder, QuerySchemas querySchemas, Map<String, Object> map, Map<String, Object> map2, long j, QueryErrorClassifier queryErrorClassifier, ExecutionStep<?> executionStep, int i, ProcessingLogger processingLogger, long j2, long j3, QueryMetadata.Listener listener, Optional<ScalablePushRegistry> optional2) {
        super(str, physicalSchema.logicalSchema(), set, str2, str3, topology, kafkaStreamsBuilder, map, map2, j, queryId, queryErrorClassifier, i, j2, j3, listener);
        this.sinkDataSource = (DataSource) Objects.requireNonNull(dataSource, "sinkDataSource");
        this.schemas = (QuerySchemas) Objects.requireNonNull(querySchemas, "schemas");
        this.resultSchema = (PhysicalSchema) Objects.requireNonNull(physicalSchema, "schema");
        this.physicalPlan = (ExecutionStep) Objects.requireNonNull(executionStep, "physicalPlan");
        this.materializationProviderBuilder = (Optional) Objects.requireNonNull(optional, "materializationProviderBuilder");
        this.processingLogger = (ProcessingLogger) Objects.requireNonNull(processingLogger, "processingLogger");
        this.scalablePushRegistry = (Optional) Objects.requireNonNull(optional2, "scalablePushRegistry");
        this.persistentQueryType = (KsqlConstants.PersistentQueryType) Objects.requireNonNull(persistentQueryType, "persistentQueryType");
        this.executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ksql-csu-metrics-reporter-%d").build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PersistentQueryMetadataImpl(PersistentQueryMetadataImpl persistentQueryMetadataImpl, QueryMetadata.Listener listener) {
        super(persistentQueryMetadataImpl, listener);
        this.sinkDataSource = persistentQueryMetadataImpl.getSink();
        this.schemas = persistentQueryMetadataImpl.schemas;
        this.resultSchema = persistentQueryMetadataImpl.resultSchema;
        this.materializationProvider = persistentQueryMetadataImpl.materializationProvider;
        this.physicalPlan = persistentQueryMetadataImpl.physicalPlan;
        this.materializationProviderBuilder = persistentQueryMetadataImpl.materializationProviderBuilder;
        this.processingLogger = persistentQueryMetadataImpl.processingLogger;
        this.scalablePushRegistry = persistentQueryMetadataImpl.scalablePushRegistry;
        this.persistentQueryType = persistentQueryMetadataImpl.getPersistentQueryType();
        this.executorService = persistentQueryMetadataImpl.executorService;
    }

    @Override // io.confluent.ksql.util.QueryMetadataImpl, io.confluent.ksql.util.QueryMetadata
    public void initialize() {
        super.initialize();
        setUncaughtExceptionHandler(this::uncaughtHandler);
        this.materializationProvider = this.materializationProviderBuilder.flatMap(materializationProviderBuilder -> {
            return materializationProviderBuilder.apply(getKafkaStreams(), getTopology());
        });
    }

    @Override // io.confluent.ksql.util.QueryMetadataImpl, io.confluent.ksql.util.PersistentQueryMetadata
    public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHandler(Throwable th) {
        StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHandler = super.uncaughtHandler(th);
        this.processingLogger.error(KafkaStreamsThreadError.of("Unhandled exception caught in streams thread", Thread.currentThread(), th));
        return uncaughtHandler;
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public DataSource.DataSourceType getDataSourceType() {
        return this.sinkDataSource.getDataSourceType();
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public KsqlTopic getResultTopic() {
        return this.sinkDataSource.getKsqlTopic();
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public SourceName getSinkName() {
        return this.sinkDataSource.getName();
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public QuerySchemas getQuerySchemas() {
        return this.schemas;
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public PhysicalSchema getPhysicalSchema() {
        return this.resultSchema;
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public ExecutionStep<?> getPhysicalPlan() {
        return this.physicalPlan;
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public DataSource getSink() {
        return this.sinkDataSource;
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public KsqlConstants.PersistentQueryType getPersistentQueryType() {
        return this.persistentQueryType;
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    @VisibleForTesting
    public Optional<MaterializationProvider> getMaterializationProvider() {
        return this.materializationProvider;
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    @VisibleForTesting
    public ProcessingLogger getProcessingLogger() {
        return this.processingLogger;
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public Optional<Materialization> getMaterialization(QueryId queryId, QueryContext.Stacker stacker) {
        return this.materializationProvider.map(materializationProvider -> {
            return materializationProvider.build(queryId, stacker);
        });
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public synchronized void stop() {
        doClose(false);
        this.scalablePushRegistry.ifPresent((v0) -> {
            v0.close();
        });
    }

    @Override // io.confluent.ksql.util.PersistentQueryMetadata
    public Optional<ScalablePushRegistry> getScalablePushRegistry() {
        return this.scalablePushRegistry;
    }
}
