package io.confluent.ksql.execution.streams.materialization.ks;

import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.execution.streams.materialization.Locator;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.util.KsqlHostInfo;
import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/KsLocator.class */
final class KsLocator implements Locator {
    private static final Logger LOG = LoggerFactory.getLogger(KsLocator.class);
    private final String stateStoreName;
    private final KafkaStreams kafkaStreams;
    private final Serializer<Struct> keySerializer;
    private final URL localHost;
    private String applicationId;

    @Immutable
    /* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/KsLocator$Node.class */
    private static final class Node implements Locator.KsqlNode {
        private final boolean local;
        private final URI location;

        private Node(boolean z, URI uri) {
            this.local = z;
            this.location = (URI) Objects.requireNonNull(uri, "location");
        }

        @Override // io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode
        public boolean isLocal() {
            return this.local;
        }

        @Override // io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode
        public URI location() {
            return this.location;
        }

        public String toString() {
            return "Node{local = " + this.local + ", location = " + this.location + "}";
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Node node = (Node) obj;
            return this.local == node.local && this.location.equals(node.location);
        }

        public int hashCode() {
            return Objects.hash(Boolean.valueOf(this.local), this.location);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsLocator(String str, KafkaStreams kafkaStreams, Serializer<Struct> serializer, URL url, String str2) {
        this.kafkaStreams = (KafkaStreams) Objects.requireNonNull(kafkaStreams, "kafkaStreams");
        this.keySerializer = (Serializer) Objects.requireNonNull(serializer, "keySerializer");
        this.stateStoreName = (String) Objects.requireNonNull(str, "stateStoreName");
        this.localHost = (URL) Objects.requireNonNull(url, "localHost");
        this.applicationId = (String) Objects.requireNonNull(str2, "applicationId");
    }

    @Override // io.confluent.ksql.execution.streams.materialization.Locator
    public List<Locator.KsqlNode> locate(Struct struct, RoutingOptions routingOptions, RoutingFilter.RoutingFilterFactory routingFilterFactory) {
        KeyQueryMetadata queryMetadataForKey = this.kafkaStreams.queryMetadataForKey(this.stateStoreName, struct, this.keySerializer);
        if (queryMetadataForKey == KeyQueryMetadata.NOT_AVAILABLE) {
            LOG.debug("KeyQueryMetadata not available for state store {} and key {}", this.stateStoreName, struct);
            throw new MaterializationException(String.format("KeyQueryMetadata not available for state store %s and key %s", this.stateStoreName, struct));
        }
        HostInfo activeHost = queryMetadataForKey.getActiveHost();
        Set standbyHosts = queryMetadataForKey.getStandbyHosts();
        LOG.debug("Before filtering: Active host {} , standby hosts {}", activeHost, standbyHosts);
        List<KsqlHostInfo> list = (List) Stream.concat(Stream.of(activeHost), standbyHosts.stream()).map(this::asKsqlHost).collect(Collectors.toList());
        RoutingFilter createRoutingFilter = routingFilterFactory.createRoutingFilter(routingOptions, list, activeHost, this.applicationId, this.stateStoreName, queryMetadataForKey.getPartition());
        Stream<KsqlHostInfo> stream = list.stream();
        createRoutingFilter.getClass();
        List<Locator.KsqlNode> list2 = (List) stream.filter(createRoutingFilter::filter).map(this::asNode).collect(Collectors.toList());
        LOG.debug("Filtered and ordered hosts: {}", list2);
        return list2;
    }

    @VisibleForTesting
    KsqlHostInfo asKsqlHost(HostInfo hostInfo) {
        return new KsqlHostInfo(hostInfo.host(), hostInfo.port());
    }

    @VisibleForTesting
    Locator.KsqlNode asNode(KsqlHostInfo ksqlHostInfo) {
        return new Node(isLocalHost(ksqlHostInfo), buildLocation(ksqlHostInfo));
    }

    private boolean isLocalHost(KsqlHostInfo ksqlHostInfo) {
        if (ksqlHostInfo.port() != this.localHost.getPort()) {
            return false;
        }
        return ksqlHostInfo.host().equalsIgnoreCase(this.localHost.getHost()) || ksqlHostInfo.host().equalsIgnoreCase("localhost");
    }

    private URI buildLocation(KsqlHostInfo ksqlHostInfo) {
        try {
            return new URL(this.localHost.getProtocol(), ksqlHostInfo.host(), ksqlHostInfo.port(), "/").toURI();
        } catch (Exception e) {
            throw new IllegalStateException("Failed to convert remote host info to URL. remoteInfo: " + ksqlHostInfo);
        }
    }
}
