package io.confluent.ksql.rest.server.resources;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.entity.ActiveStandbyEntity;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.HostStatusEntity;
import io.confluent.ksql.rest.entity.HostStoreLags;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.TopicPartitionEntity;
import io.confluent.ksql.rest.server.HeartbeatAgent;
import io.confluent.ksql.rest.server.LagReportingAgent;
import io.confluent.ksql.util.HostStatus;
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;

@Produces({"application/vnd.ksql.v1+json", "application/json"})
@Path("/clusterStatus")
/* loaded from: input_file:io/confluent/ksql/rest/server/resources/ClusterStatusResource.class */
public class ClusterStatusResource {
    private final KsqlEngine engine;
    private final HeartbeatAgent heartbeatAgent;
    private final Optional<LagReportingAgent> lagReportingAgent;
    private static final HostStoreLags EMPTY_HOST_STORE_LAGS = new HostStoreLags(ImmutableMap.of(), 0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/ClusterStatusResource$QueryIdAndStreamsMetadata.class */
    public static final class QueryIdAndStreamsMetadata {
        final String queryId;
        final StreamsMetadata streamsMetadata;

        QueryIdAndStreamsMetadata(String str, StreamsMetadata streamsMetadata) {
            this.queryId = (String) Objects.requireNonNull(str, "queryId");
            this.streamsMetadata = (StreamsMetadata) Objects.requireNonNull(streamsMetadata, "md");
        }

        public ActiveStandbyEntity toActiveStandbyEntity() {
            return new ActiveStandbyEntity(this.streamsMetadata.stateStoreNames(), (Set) this.streamsMetadata.topicPartitions().stream().map(topicPartition -> {
                return new TopicPartitionEntity(topicPartition.topic(), topicPartition.partition());
            }).collect(Collectors.toSet()), this.streamsMetadata.standbyStateStoreNames(), (Set) this.streamsMetadata.standbyTopicPartitions().stream().map(topicPartition2 -> {
                return new TopicPartitionEntity(topicPartition2.topic(), topicPartition2.partition());
            }).collect(Collectors.toSet()));
        }
    }

    public ClusterStatusResource(KsqlEngine ksqlEngine, HeartbeatAgent heartbeatAgent, Optional<LagReportingAgent> optional) {
        this.engine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "engine");
        this.heartbeatAgent = (HeartbeatAgent) Objects.requireNonNull(heartbeatAgent, "heartbeatAgent");
        this.lagReportingAgent = (Optional) Objects.requireNonNull(optional, "lagReportingAgent");
    }

    @GET
    public Response checkClusterStatus() {
        return Response.ok(getResponse()).build();
    }

    private ClusterStatusResponse getResponse() {
        return new ClusterStatusResponse((Map) this.heartbeatAgent.getHostsStatus().entrySet().stream().collect(Collectors.toMap(entry -> {
            return new KsqlHostInfoEntity(((KsqlHostInfo) entry.getKey()).host(), ((KsqlHostInfo) entry.getKey()).port());
        }, entry2 -> {
            return new HostStatusEntity(((HostStatus) entry2.getValue()).isHostAlive(), ((HostStatus) entry2.getValue()).getLastStatusUpdateMs(), getActiveStandbyInformation((KsqlHostInfo) entry2.getKey()), getHostStoreLags((KsqlHostInfo) entry2.getKey()));
        })));
    }

    private HostStoreLags getHostStoreLags(KsqlHostInfo ksqlHostInfo) {
        return (HostStoreLags) this.lagReportingAgent.flatMap(lagReportingAgent -> {
            return lagReportingAgent.getLagPerHost(ksqlHostInfo);
        }).orElse(EMPTY_HOST_STORE_LAGS);
    }

    private Map<String, ActiveStandbyEntity> getActiveStandbyInformation(KsqlHostInfo ksqlHostInfo) {
        HashMap hashMap = new HashMap();
        for (PersistentQueryMetadata persistentQueryMetadata : this.engine.getPersistentQueries()) {
            for (StreamsMetadata streamsMetadata : persistentQueryMetadata.getAllMetadata()) {
                if (streamsMetadata != StreamsMetadata.NOT_AVAILABLE && streamsMetadata.hostInfo().equals(asHostInfo(ksqlHostInfo))) {
                    QueryIdAndStreamsMetadata queryIdAndStreamsMetadata = new QueryIdAndStreamsMetadata(persistentQueryMetadata.getQueryId().toString(), streamsMetadata);
                    hashMap.putIfAbsent(queryIdAndStreamsMetadata.queryId, queryIdAndStreamsMetadata.toActiveStandbyEntity());
                }
            }
        }
        return hashMap;
    }

    private HostInfo asHostInfo(KsqlHostInfo ksqlHostInfo) {
        return new HostInfo(ksqlHostInfo.host(), ksqlHostInfo.port());
    }
}
