package io.confluent.ksql.execution.streams.materialization.ks;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.execution.streams.materialization.Locator;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.util.KsqlHostInfo;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/KsLocator.class */
public final class KsLocator implements Locator {
    private static final Logger LOG = LoggerFactory.getLogger(KsLocator.class);
    private final String stateStoreName;
    private final KafkaStreams kafkaStreams;
    private final Topology topology;
    private final Serializer<GenericKey> keySerializer;
    private final URL localHost;
    private final String applicationId;

    @Immutable
    @VisibleForTesting
    /* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/KsLocator$Node.class */
    public static final class Node implements Locator.KsqlNode {
        private final boolean local;
        private final URI location;

        private Node(boolean z, URI uri) {
            this.local = z;
            this.location = (URI) Objects.requireNonNull(uri, "location");
        }

        @Override // io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode
        public boolean isLocal() {
            return this.local;
        }

        @Override // io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode
        public URI location() {
            return this.location;
        }

        public String toString() {
            return "Node{local = " + this.local + ", location = " + this.location + "}";
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Node node = (Node) obj;
            return this.local == node.local && this.location.equals(node.location);
        }

        public int hashCode() {
            return Objects.hash(Boolean.valueOf(this.local), this.location);
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/KsLocator$PartitionLocation.class */
    public static final class PartitionLocation implements Locator.KsqlPartitionLocation {
        private final Optional<Set<Locator.KsqlKey>> keys;
        private final int partition;
        private final List<Locator.KsqlNode> nodes;

        public PartitionLocation(Optional<Set<Locator.KsqlKey>> optional, int i, List<Locator.KsqlNode> list) {
            this.keys = optional;
            this.partition = i;
            this.nodes = list;
        }

        @Override // io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation
        public Optional<Set<Locator.KsqlKey>> getKeys() {
            return this.keys;
        }

        @Override // io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation
        public List<Locator.KsqlNode> getNodes() {
            return this.nodes;
        }

        @Override // io.confluent.ksql.execution.streams.materialization.Locator.KsqlPartitionLocation
        public int getPartition() {
            return this.partition;
        }

        public String toString() {
            return " PartitionLocations {keys: " + this.keys + " , partition: " + this.partition + " , nodes: " + this.nodes + " } ";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/KsLocator$PartitionMetadata.class */
    public static class PartitionMetadata {
        private final HostInfo activeHost;
        private final Set<HostInfo> standbyHosts;
        private final int partition;
        private final Optional<Set<Locator.KsqlKey>> keys;

        PartitionMetadata(HostInfo hostInfo, Set<HostInfo> set, int i, Optional<Set<Locator.KsqlKey>> optional) {
            this.activeHost = hostInfo;
            this.standbyHosts = set;
            this.partition = i;
            this.keys = optional;
        }

        public HostInfo getActiveHost() {
            return this.activeHost;
        }

        public Set<HostInfo> getStandbyHosts() {
            return this.standbyHosts;
        }

        public int getPartition() {
            return this.partition;
        }

        public Optional<Set<Locator.KsqlKey>> getKeys() {
            return this.keys;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsLocator(String str, KafkaStreams kafkaStreams, Topology topology, Serializer<GenericKey> serializer, URL url, String str2) {
        this.kafkaStreams = (KafkaStreams) Objects.requireNonNull(kafkaStreams, "kafkaStreams");
        this.topology = (Topology) Objects.requireNonNull(topology, "topology");
        this.keySerializer = (Serializer) Objects.requireNonNull(serializer, "keySerializer");
        this.stateStoreName = (String) Objects.requireNonNull(str, "stateStoreName");
        this.localHost = (URL) Objects.requireNonNull(url, "localHost");
        this.applicationId = (String) Objects.requireNonNull(str2, "applicationId");
    }

    @Override // io.confluent.ksql.execution.streams.materialization.Locator
    public List<Locator.KsqlPartitionLocation> locate(List<Locator.KsqlKey> list, RoutingOptions routingOptions, RoutingFilter.RoutingFilterFactory routingFilterFactory) {
        ImmutableList.Builder builder = ImmutableList.builder();
        Set<Integer> partitions = routingOptions.getPartitions();
        for (PartitionMetadata partitionMetadata : list.isEmpty() ? getMetadataForAllPartitions(partitions) : getMetadataForKeys(list, partitions)) {
            LOG.debug("Handling pull query for partition {} of state store {}.", Integer.valueOf(partitionMetadata.getPartition()), this.stateStoreName);
            HostInfo activeHost = partitionMetadata.getActiveHost();
            Set<HostInfo> standbyHosts = partitionMetadata.getStandbyHosts();
            int partition = partitionMetadata.getPartition();
            builder.add(new PartitionLocation(partitionMetadata.getKeys(), partition, getFilteredHosts(routingOptions, routingFilterFactory, activeHost, standbyHosts, partition)));
        }
        return builder.build();
    }

    private List<PartitionMetadata> getMetadataForKeys(List<Locator.KsqlKey> list, Set<Integer> set) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        HashMap hashMap = new HashMap();
        for (Locator.KsqlKey ksqlKey : list) {
            KeyQueryMetadata queryMetadataForKey = this.kafkaStreams.queryMetadataForKey(this.stateStoreName, ksqlKey.getKey(), this.keySerializer);
            if (queryMetadataForKey == KeyQueryMetadata.NOT_AVAILABLE) {
                LOG.debug("KeyQueryMetadata not available for state store '{}' and key {}", this.stateStoreName, ksqlKey);
                throw new MaterializationException(String.format("Materialized data for key %s is not available yet. Please try again later.", ksqlKey));
            }
            LOG.debug("Handling pull query for key {} in partition {} of state store {}.", new Object[]{ksqlKey, Integer.valueOf(queryMetadataForKey.partition()), this.stateStoreName});
            if (set.size() <= 0 || set.contains(Integer.valueOf(queryMetadataForKey.partition()))) {
                hashMap.computeIfAbsent(Integer.valueOf(queryMetadataForKey.partition()), num -> {
                    return new LinkedHashSet();
                });
                ((Set) hashMap.get(Integer.valueOf(queryMetadataForKey.partition()))).add(ksqlKey);
                linkedHashMap.putIfAbsent(Integer.valueOf(queryMetadataForKey.partition()), queryMetadataForKey);
            } else {
                LOG.debug("Ignoring key {} in partition {} because parition is not included in lookup.", ksqlKey, Integer.valueOf(queryMetadataForKey.partition()));
            }
        }
        return (List) linkedHashMap.values().stream().map(keyQueryMetadata -> {
            return new PartitionMetadata(keyQueryMetadata.activeHost(), keyQueryMetadata.standbyHosts(), keyQueryMetadata.partition(), Optional.of(hashMap.get(Integer.valueOf(keyQueryMetadata.partition()))));
        }).collect(Collectors.toList());
    }

    private List<PartitionMetadata> getMetadataForAllPartitions(Set<Integer> set) {
        Set<String> findSubtopologySourceTopicSuffixes = findSubtopologySourceTopicSuffixes();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (StreamsMetadata streamsMetadata : this.kafkaStreams.allMetadataForStore(this.stateStoreName)) {
            streamsMetadata.topicPartitions().forEach(topicPartition -> {
                if (findSubtopologySourceTopicSuffixes.stream().anyMatch(str -> {
                    return topicPartition.topic().endsWith(str);
                })) {
                    hashMap.compute(Integer.valueOf(topicPartition.partition()), (num, hostInfo) -> {
                        if (hostInfo == null || streamsMetadata.hostInfo().equals(hostInfo)) {
                            return streamsMetadata.hostInfo();
                        }
                        throw new IllegalStateException("Should only be one active host per partition");
                    });
                }
            });
            streamsMetadata.standbyTopicPartitions().forEach(topicPartition2 -> {
                if (findSubtopologySourceTopicSuffixes.stream().anyMatch(str -> {
                    return topicPartition2.topic().endsWith(str);
                })) {
                    hashMap2.computeIfAbsent(Integer.valueOf(topicPartition2.partition()), num -> {
                        return new HashSet();
                    });
                    ((Set) hashMap2.get(Integer.valueOf(topicPartition2.partition()))).add(streamsMetadata.hostInfo());
                }
            });
        }
        Set<Integer> set2 = (Set) Streams.concat(new Stream[]{hashMap.keySet().stream(), hashMap2.keySet().stream()}).collect(Collectors.toSet());
        ArrayList arrayList = new ArrayList();
        for (Integer num : set2) {
            if (set.size() <= 0 || set.contains(num)) {
                arrayList.add(new PartitionMetadata((HostInfo) hashMap.getOrDefault(num, StreamsMetadataState.UNKNOWN_HOST), (Set) hashMap2.getOrDefault(num, Collections.emptySet()), num.intValue(), Optional.empty()));
            } else {
                LOG.debug("Ignoring partition {} because partition is not included in lookup.", num);
            }
        }
        return arrayList;
    }

    private Set<String> findSubtopologySourceTopicSuffixes() {
        for (TopologyDescription.Subtopology subtopology : this.topology.describe().subtopologies()) {
            boolean z = false;
            for (TopologyDescription.Processor processor : subtopology.nodes()) {
                if ((processor instanceof TopologyDescription.Processor) && processor.stores().contains(this.stateStoreName)) {
                    z = true;
                }
            }
            if (z) {
                for (TopologyDescription.Source source : subtopology.nodes()) {
                    if (source instanceof TopologyDescription.Source) {
                        TopologyDescription.Source source2 = source;
                        Preconditions.checkNotNull(source2.topicSet(), "Expecting topic set, not regex");
                        return source2.topicSet();
                    }
                }
                throw new IllegalStateException("Failed to find source with topics");
            }
        }
        throw new IllegalStateException("Failed to find state store " + this.stateStoreName);
    }

    private List<Locator.KsqlNode> getFilteredHosts(RoutingOptions routingOptions, RoutingFilter.RoutingFilterFactory routingFilterFactory, HostInfo hostInfo, Set<HostInfo> set, int i) {
        ImmutableList immutableList;
        if (routingOptions.getIsSkipForwardRequest()) {
            LOG.debug("Before filtering: Local host {} ", this.localHost);
            immutableList = ImmutableList.of(new KsqlHostInfo(this.localHost.getHost(), this.localHost.getPort()));
        } else {
            LOG.debug("Before filtering: Active host {} , standby hosts {}", hostInfo, set);
            immutableList = (List) Stream.concat(Stream.of(hostInfo), set.stream()).map(this::asKsqlHost).collect(Collectors.toList());
        }
        RoutingFilter createRoutingFilter = routingFilterFactory.createRoutingFilter(routingOptions, immutableList, hostInfo, this.applicationId, this.stateStoreName, i);
        Stream<KsqlHostInfo> stream = immutableList.stream();
        createRoutingFilter.getClass();
        ImmutableList immutableList2 = (ImmutableList) stream.filter(createRoutingFilter::filter).map(this::asNode).collect(ImmutableList.toImmutableList());
        LOG.debug("Filtered and ordered hosts: {}", immutableList2);
        return immutableList2;
    }

    @VisibleForTesting
    KsqlHostInfo asKsqlHost(HostInfo hostInfo) {
        return new KsqlHostInfo(hostInfo.host(), hostInfo.port());
    }

    @VisibleForTesting
    Locator.KsqlNode asNode(KsqlHostInfo ksqlHostInfo) {
        return new Node(isLocalHost(ksqlHostInfo), buildLocation(ksqlHostInfo));
    }

    private boolean isLocalHost(KsqlHostInfo ksqlHostInfo) {
        if (ksqlHostInfo.port() != this.localHost.getPort()) {
            return false;
        }
        return ksqlHostInfo.host().equalsIgnoreCase(this.localHost.getHost()) || ksqlHostInfo.host().equalsIgnoreCase("localhost");
    }

    private URI buildLocation(KsqlHostInfo ksqlHostInfo) {
        try {
            return new URL(this.localHost.getProtocol(), ksqlHostInfo.host(), ksqlHostInfo.port(), "/").toURI();
        } catch (Exception e) {
            throw new IllegalStateException("Failed to convert remote host info to URL. remoteInfo: " + ksqlHostInfo);
        }
    }
}
