package io.confluent.ksql.query;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.confluent.ksql.config.KsqlConfigResolver;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.QueryEventListener;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.BinPackedPersistentQueryMetadataImpl;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.PersistentQueryMetadataImpl;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.SandboxedBinPackedPersistentQueryMetadataImpl;
import io.confluent.ksql.util.SandboxedPersistentQueryMetadataImpl;
import io.confluent.ksql.util.SandboxedSharedKafkaStreamsRuntimeImpl;
import io.confluent.ksql.util.SandboxedTransientQueryMetadata;
import io.confluent.ksql.util.SharedKafkaStreamsRuntime;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/query/QueryRegistryImpl.class */
public class QueryRegistryImpl implements QueryRegistry {
    private final Logger log;
    private static final BiPredicate<SourceName, PersistentQueryMetadata> FILTER_QUERIES_WITH_SINK = (sourceName, persistentQueryMetadata) -> {
        return persistentQueryMetadata.getSinkName().equals(Optional.of(sourceName));
    };
    private final Map<QueryId, PersistentQueryMetadata> persistentQueries;
    private final Map<QueryId, QueryMetadata> allLiveQueries;
    private final Map<SourceName, QueryId> createAsQueries;
    private final Map<SourceName, Set<QueryId>> insertQueries;
    private final Collection<QueryEventListener> eventListeners;
    private final QueryBuilderFactory queryBuilderFactory;
    private final MetricCollectors metricCollectors;
    private final List<SharedKafkaStreamsRuntime> streams;
    private final List<SharedKafkaStreamsRuntime> sourceStreams;
    private final boolean sandbox;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.ksql.query.QueryRegistryImpl$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/ksql/query/QueryRegistryImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$ksql$util$KsqlConstants$PersistentQueryType = new int[KsqlConstants.PersistentQueryType.values().length];

        static {
            try {
                $SwitchMap$io$confluent$ksql$util$KsqlConstants$PersistentQueryType[KsqlConstants.PersistentQueryType.CREATE_SOURCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$ksql$util$KsqlConstants$PersistentQueryType[KsqlConstants.PersistentQueryType.CREATE_AS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$confluent$ksql$util$KsqlConstants$PersistentQueryType[KsqlConstants.PersistentQueryType.INSERT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:io/confluent/ksql/query/QueryRegistryImpl$ListenerImpl.class */
    private class ListenerImpl implements QueryMetadata.Listener {
        private ListenerImpl() {
        }

        @Override // io.confluent.ksql.util.QueryMetadata.Listener
        public void onError(QueryMetadata queryMetadata, QueryError queryError) {
            QueryRegistryImpl.this.eventListeners.forEach(queryEventListener -> {
                queryEventListener.onError(queryMetadata, queryError);
            });
        }

        @Override // io.confluent.ksql.util.QueryMetadata.Listener
        public void onStateChange(QueryMetadata queryMetadata, KafkaStreams.State state, KafkaStreams.State state2) {
            QueryRegistryImpl.this.eventListeners.forEach(queryEventListener -> {
                queryEventListener.onStateChange(queryMetadata, state, state2);
            });
        }

        @Override // io.confluent.ksql.util.QueryMetadata.Listener
        public void onPause(QueryMetadata queryMetadata) {
            QueryRegistryImpl.this.eventListeners.forEach(queryEventListener -> {
                queryEventListener.onKsqlStateChange(queryMetadata);
            });
        }

        @Override // io.confluent.ksql.util.QueryMetadata.Listener
        public void onResume(QueryMetadata queryMetadata) {
            QueryRegistryImpl.this.eventListeners.forEach(queryEventListener -> {
                queryEventListener.onKsqlStateChange(queryMetadata);
            });
        }

        @Override // io.confluent.ksql.util.QueryMetadata.Listener
        public void onClose(QueryMetadata queryMetadata) {
            QueryRegistryImpl.this.unregisterQuery(queryMetadata);
            QueryRegistryImpl.this.eventListeners.forEach(queryEventListener -> {
                queryEventListener.onClose(queryMetadata);
            });
        }

        /* synthetic */ ListenerImpl(QueryRegistryImpl queryRegistryImpl, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:io/confluent/ksql/query/QueryRegistryImpl$QueryBuilderFactory.class */
    interface QueryBuilderFactory {
        QueryBuilder create(SessionConfig sessionConfig, ProcessingLogContext processingLogContext, ServiceContext serviceContext, FunctionRegistry functionRegistry, List<SharedKafkaStreamsRuntime> list, boolean z);
    }

    public QueryRegistryImpl(Collection<QueryEventListener> collection, MetricCollectors metricCollectors) {
        this(collection, QueryBuilder::new, metricCollectors);
    }

    QueryRegistryImpl(Collection<QueryEventListener> collection, QueryBuilderFactory queryBuilderFactory, MetricCollectors metricCollectors) {
        this.log = LoggerFactory.getLogger(QueryRegistryImpl.class);
        this.persistentQueries = new ConcurrentHashMap();
        this.allLiveQueries = new ConcurrentHashMap();
        this.createAsQueries = new ConcurrentHashMap();
        this.insertQueries = new ConcurrentHashMap();
        this.streams = new ArrayList();
        this.sourceStreams = new ArrayList();
        this.eventListeners = (Collection) Objects.requireNonNull(collection);
        this.queryBuilderFactory = (QueryBuilderFactory) Objects.requireNonNull(queryBuilderFactory);
        this.metricCollectors = metricCollectors;
        this.sandbox = false;
    }

    private QueryRegistryImpl(QueryRegistryImpl queryRegistryImpl) {
        this.log = LoggerFactory.getLogger(QueryRegistryImpl.class);
        this.persistentQueries = new ConcurrentHashMap();
        this.allLiveQueries = new ConcurrentHashMap();
        this.createAsQueries = new ConcurrentHashMap();
        this.insertQueries = new ConcurrentHashMap();
        this.streams = new ArrayList();
        this.sourceStreams = new ArrayList();
        this.queryBuilderFactory = queryRegistryImpl.queryBuilderFactory;
        queryRegistryImpl.allLiveQueries.forEach((queryId, queryMetadata) -> {
            if (queryMetadata instanceof PersistentQueryMetadataImpl) {
                SandboxedPersistentQueryMetadataImpl of = SandboxedPersistentQueryMetadataImpl.of((PersistentQueryMetadataImpl) queryMetadata, new ListenerImpl(this, null));
                this.persistentQueries.put(of.getQueryId(), of);
                this.allLiveQueries.put(of.getQueryId(), of);
            } else if (!(queryMetadata instanceof BinPackedPersistentQueryMetadataImpl)) {
                SandboxedTransientQueryMetadata of2 = SandboxedTransientQueryMetadata.of((TransientQueryMetadata) queryMetadata, new ListenerImpl(this, null));
                this.allLiveQueries.put(of2.getQueryId(), of2);
            } else {
                SandboxedBinPackedPersistentQueryMetadataImpl of3 = SandboxedBinPackedPersistentQueryMetadataImpl.of((BinPackedPersistentQueryMetadataImpl) queryMetadata, new ListenerImpl(this, null));
                this.persistentQueries.put(of3.getQueryId(), of3);
                this.allLiveQueries.put(of3.getQueryId(), of3);
            }
        });
        this.createAsQueries.putAll(queryRegistryImpl.createAsQueries);
        for (Map.Entry<SourceName, Set<QueryId>> entry : queryRegistryImpl.insertQueries.entrySet()) {
            Set<QueryId> synchronizedSet = Collections.synchronizedSet(new HashSet());
            synchronizedSet.addAll(entry.getValue());
            this.insertQueries.put(entry.getKey(), synchronizedSet);
        }
        this.eventListeners = (Collection) queryRegistryImpl.eventListeners.stream().map((v0) -> {
            return v0.createSandbox();
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
        this.sourceStreams.addAll(queryRegistryImpl.streams);
        this.metricCollectors = queryRegistryImpl.metricCollectors;
        this.sandbox = true;
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public TransientQueryMetadata createTransientQuery(SessionConfig sessionConfig, ServiceContext serviceContext, ProcessingLogContext processingLogContext, MetaStore metaStore, String str, QueryId queryId, Set<SourceName> set, ExecutionStep<?> executionStep, String str2, LogicalSchema logicalSchema, OptionalInt optionalInt, Optional<WindowInfo> optional, boolean z) {
        TransientQueryMetadata buildTransientQuery = this.queryBuilderFactory.create(sessionConfig, processingLogContext, serviceContext, metaStore, this.streams, !this.sandbox).buildTransientQuery(str, queryId, set, executionStep, str2, logicalSchema, optionalInt, optional, z, new ListenerImpl(this, null), new StreamsBuilder(), Optional.empty(), this.metricCollectors);
        buildTransientQuery.initialize();
        registerTransientQuery(serviceContext, metaStore, buildTransientQuery);
        return buildTransientQuery;
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public TransientQueryMetadata createStreamPullQuery(SessionConfig sessionConfig, ServiceContext serviceContext, ProcessingLogContext processingLogContext, MetaStore metaStore, String str, QueryId queryId, Set<SourceName> set, ExecutionStep<?> executionStep, String str2, LogicalSchema logicalSchema, OptionalInt optionalInt, Optional<WindowInfo> optional, boolean z, ImmutableMap<TopicPartition, Long> immutableMap) {
        TransientQueryMetadata buildTransientQuery = this.queryBuilderFactory.create(sessionConfig, processingLogContext, serviceContext, metaStore, this.streams, !this.sandbox).buildTransientQuery(str, queryId, set, executionStep, str2, logicalSchema, optionalInt, optional, z, new ListenerImpl(this, null), new StreamsBuilder(), Optional.of(immutableMap), this.metricCollectors);
        buildTransientQuery.initialize();
        notifyCreate(serviceContext, metaStore, buildTransientQuery);
        return buildTransientQuery;
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public void updateStreamsPropertiesAndRestartRuntime(KsqlConfig ksqlConfig, ProcessingLogContext processingLogContext) {
        for (SharedKafkaStreamsRuntime sharedKafkaStreamsRuntime : this.streams) {
            updateStreamsProperties(sharedKafkaStreamsRuntime, ksqlConfig, processingLogContext);
            sharedKafkaStreamsRuntime.restartStreamsRuntime();
        }
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public PersistentQueryMetadata createOrReplacePersistentQuery(SessionConfig sessionConfig, ServiceContext serviceContext, ProcessingLogContext processingLogContext, MetaStore metaStore, String str, QueryId queryId, Optional<DataSource> optional, Set<DataSource> set, ExecutionStep<?> executionStep, String str2, KsqlConstants.PersistentQueryType persistentQueryType, Optional<String> optional2) {
        PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime;
        QueryBuilder create = this.queryBuilderFactory.create(sessionConfig, processingLogContext, serviceContext, metaStore, this.streams, !this.sandbox);
        KsqlConfig config = sessionConfig.getConfig(true);
        PersistentQueryMetadata persistentQueryMetadata = this.persistentQueries.get(queryId);
        if (optional2.isPresent() && config.getBoolean("ksql.runtime.feature.shared.enabled").booleanValue() && (persistentQueryMetadata == null || (persistentQueryMetadata instanceof BinPackedPersistentQueryMetadataImpl))) {
            if (this.sandbox) {
                throwOnNonQueryLevelConfigs(sessionConfig.getOverrides());
                this.streams.addAll((Collection) this.sourceStreams.stream().filter(sharedKafkaStreamsRuntime -> {
                    return sharedKafkaStreamsRuntime.getApplicationId().equals(optional2.get());
                }).map(SandboxedSharedKafkaStreamsRuntimeImpl::new).collect(Collectors.toList()));
            }
            buildPersistentQueryInDedicatedRuntime = create.buildPersistentQueryInSharedRuntime(config, persistentQueryType, str, queryId, optional, set, executionStep, str2, new ListenerImpl(this, null), () -> {
                return ImmutableList.copyOf(getPersistentQueries().values());
            }, optional2.get(), this.metricCollectors);
            buildPersistentQueryInDedicatedRuntime.register();
        } else {
            buildPersistentQueryInDedicatedRuntime = create.buildPersistentQueryInDedicatedRuntime(config, persistentQueryType, str, queryId, optional, set, executionStep, str2, new ListenerImpl(this, null), () -> {
                return ImmutableList.copyOf(getPersistentQueries().values());
            }, new StreamsBuilder(), this.metricCollectors);
        }
        registerPersistentQuery(serviceContext, metaStore, buildPersistentQueryInDedicatedRuntime);
        return buildPersistentQueryInDedicatedRuntime;
    }

    private static void throwOnNonQueryLevelConfigs(Map<String, Object> map) {
        String str = (String) map.keySet().stream().filter(str2 -> {
            return ((Boolean) new KsqlConfigResolver().resolve(str2, false).map(configItem -> {
                return Boolean.valueOf(!PropertiesList.QueryLevelProperties.contains(configItem.getPropertyName()));
            }).orElse(true)).booleanValue();
        }).distinct().collect(Collectors.joining(","));
        if (!str.isEmpty()) {
            throw new IllegalArgumentException(String.format("When shared runtimes are enabled, the configs %s can only be set for the entire cluster and all queries currently running in it, and not configurable for individual queries. Please use ALTER SYSTEM to change these config for all queries.", str));
        }
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public Optional<PersistentQueryMetadata> getPersistentQuery(QueryId queryId) {
        return Optional.ofNullable(this.persistentQueries.get(queryId));
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public Optional<QueryMetadata> getQuery(QueryId queryId) {
        return Optional.ofNullable(this.allLiveQueries.get(queryId));
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public Map<QueryId, PersistentQueryMetadata> getPersistentQueries() {
        return Collections.unmodifiableMap(this.persistentQueries);
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public Set<QueryId> getQueriesWithSink(SourceName sourceName) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        if (this.createAsQueries.containsKey(sourceName)) {
            builder.add(this.createAsQueries.get(sourceName));
        }
        builder.addAll(getInsertQueries(sourceName, FILTER_QUERIES_WITH_SINK));
        return builder.build();
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public List<QueryMetadata> getAllLiveQueries() {
        return ImmutableList.copyOf(this.allLiveQueries.values());
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public Optional<QueryMetadata> getCreateAsQuery(SourceName sourceName) {
        return this.createAsQueries.containsKey(sourceName) ? Optional.of(this.persistentQueries.get(this.createAsQueries.get(sourceName))) : Optional.empty();
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public Set<QueryId> getInsertQueries(SourceName sourceName, BiPredicate<SourceName, PersistentQueryMetadata> biPredicate) {
        Stream<QueryId> stream = this.insertQueries.getOrDefault(sourceName, Collections.emptySet()).stream();
        Map<QueryId, PersistentQueryMetadata> map = this.persistentQueries;
        map.getClass();
        return (Set) stream.map((v1) -> {
            return r1.get(v1);
        }).filter(persistentQueryMetadata -> {
            return biPredicate.test(sourceName, persistentQueryMetadata);
        }).map((v0) -> {
            return v0.getQueryId();
        }).collect(Collectors.toSet());
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public QueryRegistry createSandbox() {
        return new QueryRegistryImpl(this);
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public void close(boolean z) {
        for (QueryMetadata queryMetadata : getAllLiveQueries()) {
            if (z || (queryMetadata instanceof TransientQueryMetadata)) {
                queryMetadata.close();
            } else {
                ((PersistentQueryMetadata) queryMetadata).stop();
                unregisterQuery(queryMetadata);
            }
        }
        closeRuntimes();
    }

    @Override // io.confluent.ksql.query.QueryRegistry
    public void closeRuntimes() {
        Iterator<SharedKafkaStreamsRuntime> it = this.streams.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.streams.clear();
    }

    private void updateStreamsProperties(SharedKafkaStreamsRuntime sharedKafkaStreamsRuntime, KsqlConfig ksqlConfig, ProcessingLogContext processingLogContext) {
        sharedKafkaStreamsRuntime.overrideStreamsProperties(QueryBuilder.buildStreamsProperties(sharedKafkaStreamsRuntime.getApplicationId(), Optional.empty(), this.metricCollectors, ksqlConfig, processingLogContext));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void registerPersistentQuery(ServiceContext serviceContext, MetaStore metaStore, PersistentQueryMetadata persistentQueryMetadata) {
        QueryId queryId = persistentQueryMetadata.getQueryId();
        PersistentQueryMetadata persistentQueryMetadata2 = this.persistentQueries.get(queryId);
        if (persistentQueryMetadata2 != null) {
            persistentQueryMetadata2.getPhysicalPlan().validateUpgrade(persistentQueryMetadata.getPhysicalPlan());
            this.log.info("Detected that query {} already exists so will replace it.First will stop without resetting offsets", persistentQueryMetadata2.getQueryId());
            persistentQueryMetadata2.stop(true);
            unregisterQuery(persistentQueryMetadata2);
        }
        if (persistentQueryMetadata2 == null || !this.sandbox) {
            persistentQueryMetadata.initialize();
        }
        this.persistentQueries.put(queryId, persistentQueryMetadata);
        switch (AnonymousClass1.$SwitchMap$io$confluent$ksql$util$KsqlConstants$PersistentQueryType[persistentQueryMetadata.getPersistentQueryType().ordinal()]) {
            case 1:
                this.createAsQueries.put(Iterables.getOnlyElement(persistentQueryMetadata.mo283getSourceNames()), queryId);
                break;
            case 2:
                this.createAsQueries.put(persistentQueryMetadata.getSinkName().get(), queryId);
                break;
            case 3:
                sinkAndSources(persistentQueryMetadata).forEach(sourceName -> {
                    this.insertQueries.computeIfAbsent(sourceName, sourceName -> {
                        return Collections.synchronizedSet(new HashSet());
                    }).add(queryId);
                });
                break;
        }
        this.allLiveQueries.put(persistentQueryMetadata.getQueryId(), persistentQueryMetadata);
        notifyCreate(serviceContext, metaStore, persistentQueryMetadata);
    }

    private void registerTransientQuery(ServiceContext serviceContext, MetaStore metaStore, TransientQueryMetadata transientQueryMetadata) {
        if (!transientQueryMetadata.isInitialized()) {
            throw new IllegalStateException("Transient query must be initialized before it might be exposed to other threads via allLiveQueries");
        }
        this.allLiveQueries.put(transientQueryMetadata.getQueryId(), transientQueryMetadata);
        notifyCreate(serviceContext, metaStore, transientQueryMetadata);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unregisterQuery(QueryMetadata queryMetadata) {
        if (queryMetadata instanceof PersistentQueryMetadata) {
            PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
            QueryId queryId = persistentQueryMetadata.getQueryId();
            this.persistentQueries.remove(queryId);
            Set set = (Set) this.streams.stream().filter(sharedKafkaStreamsRuntime -> {
                return sharedKafkaStreamsRuntime.getCollocatedQueries().isEmpty();
            }).collect(Collectors.toSet());
            this.streams.removeAll(set);
            set.forEach((v0) -> {
                v0.close();
            });
            switch (AnonymousClass1.$SwitchMap$io$confluent$ksql$util$KsqlConstants$PersistentQueryType[persistentQueryMetadata.getPersistentQueryType().ordinal()]) {
                case 1:
                    this.createAsQueries.remove(Iterables.getOnlyElement(persistentQueryMetadata.mo283getSourceNames()));
                    break;
                case 2:
                    this.createAsQueries.remove(persistentQueryMetadata.getSinkName().get());
                    break;
                case 3:
                    sinkAndSources(persistentQueryMetadata).forEach(sourceName -> {
                        this.insertQueries.computeIfPresent(sourceName, (sourceName, set2) -> {
                            set2.remove(queryId);
                            if (set2.isEmpty()) {
                                return null;
                            }
                            return set2;
                        });
                    });
                    break;
            }
        }
        this.allLiveQueries.remove(queryMetadata.getQueryId());
        notifyDeregister(queryMetadata);
    }

    private void notifyCreate(ServiceContext serviceContext, MetaStore metaStore, QueryMetadata queryMetadata) {
        this.eventListeners.forEach(queryEventListener -> {
            queryEventListener.onCreate(serviceContext, metaStore, queryMetadata);
        });
    }

    private void notifyDeregister(QueryMetadata queryMetadata) {
        this.eventListeners.forEach(queryEventListener -> {
            queryEventListener.onDeregister(queryMetadata);
        });
    }

    private Iterable<SourceName> sinkAndSources(PersistentQueryMetadata persistentQueryMetadata) {
        return Iterables.concat(persistentQueryMetadata.getSinkName().isPresent() ? Collections.singleton(persistentQueryMetadata.getSinkName().get()) : Collections.emptySet(), persistentQueryMetadata.mo283getSourceNames());
    }
}
