package io.confluent.ksql.rest.server.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.parser.tree.ListQueries;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.QueryDescription;
import io.confluent.ksql.rest.entity.QueryDescriptionFactory;
import io.confluent.ksql.rest.entity.QueryDescriptionList;
import io.confluent.ksql.rest.entity.QueryStatusCount;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.server.ServerUtil;
import io.confluent.ksql.rest.util.DiscoverRemoteHostsUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings({"SE_BAD_FIELD"})
/* loaded from: input_file:io/confluent/ksql/rest/server/execution/ListQueriesExecutor.class */
public final class ListQueriesExecutor {
    private static final int TIMEOUT_SECONDS = 10;
    private static final Logger LOG = LoggerFactory.getLogger(ListQueriesExecutor.class);

    private ListQueriesExecutor() {
    }

    public static Optional<KsqlEntity> execute(ConfiguredStatement<ListQueries> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        Pair<List<KsqlEntity>, Set<HostInfo>> scatterGather = scatterGather(configuredStatement, sessionProperties, ksqlExecutionContext, serviceContext);
        return configuredStatement.getStatement().getShowExtended() ? executeExtended(scatterGather, sessionProperties, configuredStatement, ksqlExecutionContext) : executeSimple(scatterGather, configuredStatement, ksqlExecutionContext);
    }

    private static Optional<KsqlEntity> executeSimple(Pair<List<KsqlEntity>, Set<HostInfo>> pair, ConfiguredStatement<ListQueries> configuredStatement, KsqlExecutionContext ksqlExecutionContext) {
        Map<QueryId, RunningQuery> localSimple = getLocalSimple(ksqlExecutionContext);
        mergeSimple(pair, localSimple);
        return Optional.of(new Queries(configuredStatement.getStatementText(), localSimple.values()));
    }

    private static Map<QueryId, RunningQuery> getLocalSimple(KsqlExecutionContext ksqlExecutionContext) {
        return (Map) ksqlExecutionContext.getAllLiveQueries().stream().collect(Collectors.toMap((v0) -> {
            return v0.getQueryId();
        }, queryMetadata -> {
            if (!(queryMetadata instanceof PersistentQueryMetadata)) {
                return new RunningQuery(queryMetadata.getStatementString(), ImmutableSet.of(), ImmutableSet.of(), queryMetadata.getQueryId(), QueryStatusCount.fromStreamsStateCounts(Collections.singletonMap(queryMetadata.getState(), 1)), queryMetadata.getQueryType());
            }
            PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
            return new RunningQuery(queryMetadata.getStatementString(), ImmutableSet.of(persistentQueryMetadata.getSinkName().text()), ImmutableSet.of(persistentQueryMetadata.getResultTopic().getKafkaTopicName()), queryMetadata.getQueryId(), QueryStatusCount.fromStreamsStateCounts(Collections.singletonMap(queryMetadata.getState(), 1)), queryMetadata.getQueryType());
        }));
    }

    private static void mergeSimple(Pair<List<KsqlEntity>, Set<HostInfo>> pair, Map<QueryId, RunningQuery> map) {
        Stream stream = ((List) pair.getLeft()).stream();
        Class<Queries> cls = Queries.class;
        Queries.class.getClass();
        for (RunningQuery runningQuery : (List) stream.map((v1) -> {
            return r1.cast(v1);
        }).map((v0) -> {
            return v0.getQueries();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())) {
            QueryId id = runningQuery.getId();
            if (map.containsKey(id)) {
                for (Map.Entry entry : runningQuery.getStatusCount().getStatuses().entrySet()) {
                    map.get(id).getStatusCount().updateStatusCount((KsqlConstants.KsqlQueryStatus) entry.getKey(), ((Integer) entry.getValue()).intValue());
                }
            } else {
                map.put(id, runningQuery);
            }
        }
        Set set = (Set) pair.getRight();
        if (set.isEmpty()) {
            return;
        }
        Iterator<RunningQuery> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().getStatusCount().updateStatusCount(KsqlConstants.KsqlQueryStatus.UNRESPONSIVE, set.size());
        }
    }

    private static Optional<KsqlEntity> executeExtended(Pair<List<KsqlEntity>, Set<HostInfo>> pair, SessionProperties sessionProperties, ConfiguredStatement<ListQueries> configuredStatement, KsqlExecutionContext ksqlExecutionContext) {
        Map<QueryId, QueryDescription> localExtended = getLocalExtended(sessionProperties, ksqlExecutionContext);
        mergeExtended(pair, localExtended);
        return Optional.of(new QueryDescriptionList(configuredStatement.getStatementText(), localExtended.values()));
    }

    private static Map<QueryId, QueryDescription> getLocalExtended(SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext) {
        return (Map) ksqlExecutionContext.getAllLiveQueries().stream().collect(Collectors.toMap((v0) -> {
            return v0.getQueryId();
        }, queryMetadata -> {
            return QueryDescriptionFactory.forQueryMetadata(queryMetadata, Collections.singletonMap(new KsqlHostInfoEntity(sessionProperties.getKsqlHostInfo()), KsqlConstants.fromStreamsState(queryMetadata.getState())));
        }));
    }

    private static void mergeExtended(Pair<List<KsqlEntity>, Set<HostInfo>> pair, Map<QueryId, QueryDescription> map) {
        Stream stream = ((List) pair.getLeft()).stream();
        Class<QueryDescriptionList> cls = QueryDescriptionList.class;
        QueryDescriptionList.class.getClass();
        for (QueryDescription queryDescription : (List) stream.map((v1) -> {
            return r1.cast(v1);
        }).map((v0) -> {
            return v0.getQueryDescriptions();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())) {
            QueryId id = queryDescription.getId();
            if (map.containsKey(id)) {
                for (Map.Entry entry : queryDescription.getKsqlHostQueryStatus().entrySet()) {
                    map.get(id).updateKsqlHostQueryStatus((KsqlHostInfoEntity) entry.getKey(), (KsqlConstants.KsqlQueryStatus) entry.getValue());
                }
            } else {
                map.put(id, queryDescription);
            }
        }
        for (HostInfo hostInfo : (Set) pair.getRight()) {
            Iterator<QueryDescription> it = map.values().iterator();
            while (it.hasNext()) {
                it.next().updateKsqlHostQueryStatus(new KsqlHostInfoEntity(hostInfo.host(), hostInfo.port()), KsqlConstants.KsqlQueryStatus.UNRESPONSIVE);
            }
        }
    }

    private static Pair<List<KsqlEntity>, Set<HostInfo>> scatterGather(ConfiguredStatement<ListQueries> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        if (sessionProperties.getInternalRequest()) {
            return new Pair<>(ImmutableList.of(), ImmutableSet.of());
        }
        Set<HostInfo> remoteHosts = DiscoverRemoteHostsUtil.getRemoteHosts(ksqlExecutionContext.getPersistentQueries(), sessionProperties.getKsqlHostInfo());
        if (remoteHosts.isEmpty()) {
            return new Pair<>(ImmutableList.of(), ImmutableSet.of());
        }
        HashSet hashSet = new HashSet();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(remoteHosts.size());
        try {
            SimpleKsqlClient ksqlClient = serviceContext.getKsqlClient();
            HashMap hashMap = new HashMap();
            for (HostInfo hostInfo : remoteHosts) {
                CompletableFuture completableFuture = new CompletableFuture();
                newFixedThreadPool.execute(() -> {
                    completableFuture.complete(ksqlClient.makeKsqlRequest(ServerUtil.buildRemoteUri(sessionProperties.getLocalUrl(), hostInfo.host(), hostInfo.port()), configuredStatement.getStatementText(), Collections.singletonMap("request.ksql.internal.request", true)));
                });
                hashMap.put(hostInfo, completableFuture);
            }
            ArrayList arrayList = new ArrayList();
            for (Map.Entry entry : hashMap.entrySet()) {
                try {
                    RestResponse restResponse = (RestResponse) ((CompletableFuture) entry.getValue()).get(10L, TimeUnit.SECONDS);
                    if (restResponse.isErroneous()) {
                        LOG.warn("Error response from host. host: {}, cause: {}", entry.getKey(), restResponse.getErrorMessage().getMessage());
                        hashSet.add(entry.getKey());
                    } else {
                        arrayList.add(((KsqlEntityList) restResponse.getResponse()).get(0));
                    }
                } catch (Exception e) {
                    LOG.warn("Failed to retrieve query info from host. host: {}, cause: {}", entry.getKey(), e.getMessage());
                    hashSet.add(entry.getKey());
                }
            }
            Pair<List<KsqlEntity>, Set<HostInfo>> pair = new Pair<>(arrayList, hashSet);
            newFixedThreadPool.shutdown();
            return pair;
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            throw th;
        }
    }
}
