package io.confluent.ksql.rest.server.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.DescribeStreams;
import io.confluent.ksql.parser.tree.DescribeTables;
import io.confluent.ksql.parser.tree.ListStreams;
import io.confluent.ksql.parser.tree.ListTables;
import io.confluent.ksql.parser.tree.ShowColumns;
import io.confluent.ksql.parser.tree.StatementWithExtendedClause;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.rest.entity.ConsumerPartitionOffsets;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.entity.QueryOffsetSummary;
import io.confluent.ksql.rest.entity.QueryStatusCount;
import io.confluent.ksql.rest.entity.QueryTopicOffsetSummary;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceDescriptionFactory;
import io.confluent.ksql.rest.entity.SourceDescriptionList;
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.server.KsqlRestApplication;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryApplicationId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;

/* loaded from: input_file:io/confluent/ksql/rest/server/execution/ListSourceExecutor.class */
public final class ListSourceExecutor {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/execution/ListSourceExecutor$SourceDescriptionWithWarnings.class */
    public static final class SourceDescriptionWithWarnings {
        private final List<KsqlWarning> warnings;
        private final SourceDescription description;

        private SourceDescriptionWithWarnings(List<KsqlWarning> list, SourceDescription sourceDescription) {
            this.warnings = list;
            this.description = sourceDescription;
        }
    }

    private ListSourceExecutor() {
    }

    private static Optional<KsqlEntity> sourceDescriptionList(ConfiguredStatement<? extends StatementWithExtendedClause> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext, List<? extends DataSource> list, boolean z) {
        Multimap<String, SourceDescription> fetchSourceDescriptions = z ? RemoteSourceDescriptionExecutor.fetchSourceDescriptions(RemoteHostExecutor.create(configuredStatement, sessionProperties, ksqlExecutionContext, serviceContext.getKsqlClient())) : ImmutableMultimap.of();
        List list2 = (List) list.stream().map(dataSource -> {
            return describeSource(configuredStatement.getSessionConfig().getConfig(false), ksqlExecutionContext, serviceContext, dataSource.getName(), z, configuredStatement, sessionProperties, fetchSourceDescriptions.get(dataSource.getName().toString()));
        }).collect(Collectors.toList());
        return Optional.of(new SourceDescriptionList(configuredStatement.getMaskedStatementText(), (List) list2.stream().map(sourceDescriptionWithWarnings -> {
            return sourceDescriptionWithWarnings.description;
        }).collect(Collectors.toList()), (List) list2.stream().flatMap(sourceDescriptionWithWarnings2 -> {
            return sourceDescriptionWithWarnings2.warnings.stream();
        }).collect(Collectors.toList())));
    }

    public static StatementExecutorResponse streams(ConfiguredStatement<ListStreams> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        List<KsqlStream<?>> specificStreams = getSpecificStreams(ksqlExecutionContext);
        ListStreams statement = configuredStatement.getStatement();
        return statement.getShowExtended() ? StatementExecutorResponse.handled(sourceDescriptionList(configuredStatement, sessionProperties, ksqlExecutionContext, serviceContext, specificStreams, statement.getShowExtended())) : StatementExecutorResponse.handled(Optional.of(new StreamsList(configuredStatement.getMaskedStatementText(), (Collection) specificStreams.stream().map(ListSourceExecutor::sourceSteam).collect(Collectors.toList()))));
    }

    public static StatementExecutorResponse tables(ConfiguredStatement<ListTables> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        List<KsqlTable<?>> specificTables = getSpecificTables(ksqlExecutionContext);
        ListTables statement = configuredStatement.getStatement();
        return statement.getShowExtended() ? StatementExecutorResponse.handled(sourceDescriptionList(configuredStatement, sessionProperties, ksqlExecutionContext, serviceContext, specificTables, statement.getShowExtended())) : StatementExecutorResponse.handled(Optional.of(new TablesList(configuredStatement.getMaskedStatementText(), (Collection) specificTables.stream().map(ListSourceExecutor::sourceTable).collect(Collectors.toList()))));
    }

    public static StatementExecutorResponse describeStreams(ConfiguredStatement<DescribeStreams> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        return StatementExecutorResponse.handled(sourceDescriptionList(configuredStatement, sessionProperties, ksqlExecutionContext, serviceContext, getSpecificStreams(ksqlExecutionContext), configuredStatement.getStatement().getShowExtended()));
    }

    public static StatementExecutorResponse describeTables(ConfiguredStatement<DescribeTables> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        return StatementExecutorResponse.handled(sourceDescriptionList(configuredStatement, sessionProperties, ksqlExecutionContext, serviceContext, getSpecificTables(ksqlExecutionContext), configuredStatement.getStatement().getShowExtended()));
    }

    public static StatementExecutorResponse columns(ConfiguredStatement<ShowColumns> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        ShowColumns statement = configuredStatement.getStatement();
        SourceDescriptionWithWarnings describeSource = describeSource(configuredStatement.getSessionConfig().getConfig(false), ksqlExecutionContext, serviceContext, statement.getTable(), statement.isExtended(), configuredStatement, sessionProperties, ImmutableList.of());
        return StatementExecutorResponse.handled(Optional.of(new SourceDescriptionEntity(configuredStatement.getMaskedStatementText(), describeSource.description, describeSource.warnings)));
    }

    private static List<KsqlTable<?>> getSpecificTables(KsqlExecutionContext ksqlExecutionContext) {
        Stream stream = ksqlExecutionContext.getMetaStore().getAllDataSources().values().stream();
        Class<KsqlTable> cls = KsqlTable.class;
        KsqlTable.class.getClass();
        return (List) stream.filter((v1) -> {
            return r1.isInstance(v1);
        }).filter(dataSource -> {
            return !dataSource.getName().equals(KsqlRestApplication.getCommandsStreamName());
        }).map(dataSource2 -> {
            return (KsqlTable) dataSource2;
        }).collect(Collectors.toList());
    }

    private static List<KsqlStream<?>> getSpecificStreams(KsqlExecutionContext ksqlExecutionContext) {
        Stream stream = ksqlExecutionContext.getMetaStore().getAllDataSources().values().stream();
        Class<KsqlStream> cls = KsqlStream.class;
        KsqlStream.class.getClass();
        return (List) stream.filter((v1) -> {
            return r1.isInstance(v1);
        }).filter(dataSource -> {
            return !dataSource.getName().equals(KsqlRestApplication.getCommandsStreamName());
        }).map(dataSource2 -> {
            return (KsqlStream) dataSource2;
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static SourceDescriptionWithWarnings describeSource(KsqlConfig ksqlConfig, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext, SourceName sourceName, boolean z, ConfiguredStatement<? extends StatementWithExtendedClause> configuredStatement, SessionProperties sessionProperties, Collection<SourceDescription> collection) {
        DataSource source = ksqlExecutionContext.getMetaStore().getSource(sourceName);
        if (source == null) {
            throw new KsqlStatementException(String.format("Could not find STREAM/TABLE '%s' in the Metastore" + ksqlExecutionContext.getMetaStore().checkAlternatives(sourceName, Optional.empty()), sourceName.text()), configuredStatement.getMaskedStatementText());
        }
        List<RunningQuery> queries = getQueries(ksqlExecutionContext, persistentQueryMetadata -> {
            return persistentQueryMetadata.getSourceNames().contains(source.getName());
        });
        List<RunningQuery> queries2 = getQueries(ksqlExecutionContext, persistentQueryMetadata2 -> {
            return persistentQueryMetadata2.getSinkName().equals(Optional.of(source.getName()));
        });
        Optional empty = Optional.empty();
        List emptyList = Collections.emptyList();
        List<String> emptyList2 = Collections.emptyList();
        LinkedList linkedList = new LinkedList();
        try {
            empty = Optional.of(serviceContext.getTopicClient().describeTopic(source.getKafkaTopicName()));
            emptyList2 = getSourceConstraints(sourceName, ksqlExecutionContext.getMetaStore());
        } catch (KafkaException | KafkaResponseGetFailedException e) {
            linkedList.add(new KsqlWarning("Error from Kafka: " + e.getMessage()));
        }
        return z ? new SourceDescriptionWithWarnings(linkedList, SourceDescriptionFactory.create(source, z, queries, queries2, empty, queryOffsetSummaries(ksqlConfig, serviceContext, queries2), emptyList2, collection.stream().flatMap(sourceDescription -> {
            return sourceDescription.getClusterStatistics().stream();
        }), collection.stream().flatMap(sourceDescription2 -> {
            return sourceDescription2.getClusterErrorStats().stream();
        }), sessionProperties.getKsqlHostInfo(), ksqlExecutionContext.metricCollectors())) : new SourceDescriptionWithWarnings(linkedList, SourceDescriptionFactory.create(source, z, queries, queries2, empty, emptyList, emptyList2, Stream.empty(), Stream.empty(), sessionProperties.getKsqlHostInfo(), ksqlExecutionContext.metricCollectors()));
    }

    private static List<String> getSourceConstraints(SourceName sourceName, MetaStore metaStore) {
        return (List) metaStore.getSourceConstraints(sourceName).stream().map((v0) -> {
            return v0.text();
        }).collect(Collectors.toList());
    }

    private static List<QueryOffsetSummary> queryOffsetSummaries(KsqlConfig ksqlConfig, ServiceContext serviceContext, List<RunningQuery> list) {
        HashMap hashMap = new HashMap(list.size());
        HashMap hashMap2 = new HashMap();
        HashSet hashSet = new HashSet();
        Iterator<RunningQuery> it = list.iterator();
        while (it.hasNext()) {
            String build = QueryApplicationId.build(ksqlConfig, true, it.next().getId());
            Map listConsumerGroupOffsets = serviceContext.getConsumerGroupClient().listConsumerGroupOffsets(build);
            hashMap.put(build, listConsumerGroupOffsets);
            Set set = (Set) listConsumerGroupOffsets.keySet().stream().map((v0) -> {
                return v0.topic();
            }).collect(Collectors.toSet());
            hashMap2.put(build, set);
            hashSet.addAll(set);
        }
        Map describeTopics = serviceContext.getTopicClient().describeTopics(hashSet);
        Map listTopicsStartOffsets = serviceContext.getTopicClient().listTopicsStartOffsets(hashSet);
        Map listTopicsEndOffsets = serviceContext.getTopicClient().listTopicsEndOffsets(hashSet);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : hashMap2.entrySet()) {
            ArrayList arrayList2 = new ArrayList();
            for (String str : (Set) entry.getValue()) {
                arrayList2.add(new QueryTopicOffsetSummary(str, consumerPartitionOffsets((TopicDescription) describeTopics.get(str), listTopicsStartOffsets, listTopicsEndOffsets, (Map) hashMap.get(entry.getKey()))));
            }
            arrayList.add(new QueryOffsetSummary((String) entry.getKey(), arrayList2));
        }
        return arrayList;
    }

    private static List<ConsumerPartitionOffsets> consumerPartitionOffsets(TopicDescription topicDescription, Map<TopicPartition, Long> map, Map<TopicPartition, Long> map2, Map<TopicPartition, OffsetAndMetadata> map3) {
        ArrayList arrayList = new ArrayList();
        for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
            TopicPartition topicPartition = new TopicPartition(topicDescription.name(), topicPartitionInfo.partition());
            Long l = map.get(topicPartition);
            Long l2 = map2.get(topicPartition);
            OffsetAndMetadata offsetAndMetadata = map3.get(topicPartition);
            arrayList.add(new ConsumerPartitionOffsets(topicPartitionInfo.partition(), l.longValue(), l2.longValue(), offsetAndMetadata != null ? offsetAndMetadata.offset() : 0L));
        }
        return arrayList;
    }

    private static List<RunningQuery> getQueries(KsqlExecutionContext ksqlExecutionContext, Predicate<PersistentQueryMetadata> predicate) {
        return (List) ksqlExecutionContext.getPersistentQueries().stream().filter(predicate).map(persistentQueryMetadata -> {
            return new RunningQuery(persistentQueryMetadata.getStatementString(), persistentQueryMetadata.getSinkName().isPresent() ? ImmutableSet.of(((SourceName) persistentQueryMetadata.getSinkName().get()).text()) : ImmutableSet.of(), persistentQueryMetadata.getResultTopic().isPresent() ? ImmutableSet.of(((KsqlTopic) persistentQueryMetadata.getResultTopic().get()).getKafkaTopicName()) : ImmutableSet.of(), persistentQueryMetadata.getQueryId(), new QueryStatusCount(Collections.singletonMap(persistentQueryMetadata.getQueryStatus(), 1)), KsqlConstants.KsqlQueryType.PERSISTENT);
        }).collect(Collectors.toList());
    }

    private static SourceInfo.Stream sourceSteam(KsqlStream<?> ksqlStream) {
        return new SourceInfo.Stream(ksqlStream.getName().text(), ksqlStream.getKsqlTopic().getKafkaTopicName(), ksqlStream.getKsqlTopic().getKeyFormat().getFormat(), ksqlStream.getKsqlTopic().getValueFormat().getFormat(), ksqlStream.getKsqlTopic().getKeyFormat().isWindowed());
    }

    private static SourceInfo.Table sourceTable(KsqlTable<?> ksqlTable) {
        return new SourceInfo.Table(ksqlTable.getName().text(), ksqlTable.getKsqlTopic().getKafkaTopicName(), ksqlTable.getKsqlTopic().getKeyFormat().getFormat(), ksqlTable.getKsqlTopic().getValueFormat().getFormat(), ksqlTable.getKsqlTopic().getKeyFormat().isWindowed());
    }
}
