package io.confluent.ksql.physical.scalablepush.locator;

import io.confluent.ksql.physical.scalablepush.locator.PushLocator;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;

/* loaded from: input_file:io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator.class */
public class AllHostsLocator implements PushLocator {
    private final Supplier<List<PersistentQueryMetadata>> allPersistentQueries;
    private final URL localhost;

    /* loaded from: input_file:io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator$Node.class */
    private static class Node implements PushLocator.KsqlNode {
        private final boolean isLocal;
        private final URI location;

        Node(boolean z, URI uri) {
            this.isLocal = z;
            this.location = uri;
        }

        @Override // io.confluent.ksql.physical.scalablepush.locator.PushLocator.KsqlNode
        public boolean isLocal() {
            return this.isLocal;
        }

        @Override // io.confluent.ksql.physical.scalablepush.locator.PushLocator.KsqlNode
        public URI location() {
            return this.location;
        }

        public String toString() {
            return "Node{isLocal = " + this.isLocal + ", location = " + this.location + "}";
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Node node = (Node) obj;
            return this.isLocal == node.isLocal && this.location.equals(node.location);
        }

        public int hashCode() {
            return Objects.hash(Boolean.valueOf(this.isLocal), this.location);
        }
    }

    public AllHostsLocator(Supplier<List<PersistentQueryMetadata>> supplier, URL url) {
        this.allPersistentQueries = supplier;
        try {
            this.localhost = new URL(url.toString());
        } catch (MalformedURLException e) {
            throw new IllegalStateException("Could not deep copy URL: " + url);
        }
    }

    @Override // io.confluent.ksql.physical.scalablepush.locator.PushLocator
    public List<PushLocator.KsqlNode> locate() {
        List<PersistentQueryMetadata> list = this.allPersistentQueries.get();
        return list.isEmpty() ? Collections.emptyList() : (List) list.stream().filter(persistentQueryMetadata -> {
            return persistentQueryMetadata.getState() == KafkaStreams.State.RUNNING;
        }).map((v0) -> {
            return v0.getAllMetadata();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).filter(streamsMetadata -> {
            return !streamsMetadata.equals(StreamsMetadataImpl.NOT_AVAILABLE);
        }).map((v0) -> {
            return v0.hostInfo();
        }).map(hostInfo -> {
            return new Node(isLocalhost(hostInfo), buildLocation(hostInfo));
        }).distinct().collect(Collectors.toList());
    }

    private boolean isLocalhost(HostInfo hostInfo) {
        if (hostInfo.port() != this.localhost.getPort()) {
            return false;
        }
        return hostInfo.host().equalsIgnoreCase(this.localhost.getHost()) || hostInfo.host().equalsIgnoreCase("localhost");
    }

    private URI buildLocation(HostInfo hostInfo) {
        try {
            return new URL(this.localhost.getProtocol(), hostInfo.host(), hostInfo.port(), "/").toURI();
        } catch (Exception e) {
            throw new IllegalStateException("Failed to convert remote host info to URL. remoteInfo: " + hostInfo);
        }
    }
}
