package io.confluent.ksql.execution.pull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.ksql.execution.pull.PullPhysicalPlan;
import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.execution.streams.materialization.Locator;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.execution.streams.materialization.ks.NotUpToBoundException;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.query.PullQueryWriteStream;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/pull/HARouting.class */
public final class HARouting implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(HARouting.class);
    private final ExecutorService coordinatorExecutorService;
    private final ExecutorService routerExecutorService;
    private final RoutingFilter.RoutingFilterFactory routingFilterFactory;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/pull/HARouting$PartitionFetchResult.class */
    public static class PartitionFetchResult {
        private final RoutingResult routingResult;
        private final Locator.KsqlPartitionLocation location;
        private final Optional<Exception> exception;

        PartitionFetchResult(RoutingResult routingResult, Locator.KsqlPartitionLocation ksqlPartitionLocation, Optional<Exception> optional) {
            this.routingResult = routingResult;
            this.location = ksqlPartitionLocation;
            this.exception = optional;
        }

        public boolean isError() {
            return this.routingResult == RoutingResult.STANDBY_FALLBACK;
        }

        public RoutingResult getResult() {
            return this.routingResult;
        }

        public Locator.KsqlPartitionLocation getLocation() {
            return this.location;
        }

        public Optional<Exception> getException() {
            return this.exception;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/pull/HARouting$RoutingResult.class */
    public enum RoutingResult {
        SUCCESS,
        STANDBY_FALLBACK
    }

    public HARouting(RoutingFilter.RoutingFilterFactory routingFilterFactory, Optional<PullQueryExecutorMetrics> optional, KsqlConfig ksqlConfig) {
        this.routingFilterFactory = (RoutingFilter.RoutingFilterFactory) Objects.requireNonNull(routingFilterFactory, "routingFilterFactory");
        this.coordinatorExecutorService = Executors.newFixedThreadPool(ksqlConfig.getInt("ksql.query.pull.thread.pool.size").intValue(), new ThreadFactoryBuilder().setNameFormat("pull-query-coordinator-%d").build());
        this.routerExecutorService = Executors.newFixedThreadPool(ksqlConfig.getInt("ksql.query.pull.router.thread.pool.size").intValue(), new ThreadFactoryBuilder().setNameFormat("pull-query-router-%d").build());
        this.pullQueryMetrics = (Optional) Objects.requireNonNull(optional, "pullQueryMetrics");
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.coordinatorExecutorService.shutdown();
        this.routerExecutorService.shutdown();
    }

    public CompletableFuture<Void> handlePullQuery(ServiceContext serviceContext, PullPhysicalPlan pullPhysicalPlan, ConfiguredStatement<Query> configuredStatement, RoutingOptions routingOptions, PullQueryWriteStream pullQueryWriteStream, CompletableFuture<Void> completableFuture) {
        List locate = pullPhysicalPlan.getMaterialization().locator().locate(pullPhysicalPlan.getKeys(), routingOptions, this.routingFilterFactory, pullPhysicalPlan.getPlanType() == PullPhysicalPlan.PullPhysicalPlanType.RANGE_SCAN);
        Map map = (Map) locate.stream().filter(ksqlPartitionLocation -> {
            return ksqlPartitionLocation.getNodes().stream().noneMatch(ksqlNode -> {
                return ksqlNode.getHost().isSelected();
            });
        }).collect(Collectors.toMap((v0) -> {
            return v0.getPartition();
        }, ksqlPartitionLocation2 -> {
            return (List) ksqlPartitionLocation2.getNodes().stream().map((v0) -> {
                return v0.getHost();
            }).collect(Collectors.toList());
        }));
        if (!map.isEmpty()) {
            MaterializationException materializationException = new MaterializationException("Unable to execute pull query. " + ((String) map.entrySet().stream().map(entry -> {
                return String.format("Partition %s failed to find valid host. Hosts scanned: %s", entry.getKey(), entry.getValue());
            }).collect(Collectors.joining(", ", "[", "]"))));
            LOG.debug(materializationException.getMessage());
            throw materializationException;
        }
        List list = (List) locate.stream().map((v0) -> {
            return v0.removeFilteredHosts();
        }).collect(Collectors.toList());
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        this.coordinatorExecutorService.submit(() -> {
            try {
                executeRounds(serviceContext, pullPhysicalPlan, configuredStatement, routingOptions, list, pullQueryWriteStream, completableFuture);
                completableFuture2.complete(null);
            } catch (Throwable th) {
                completableFuture2.completeExceptionally(th);
            }
        });
        return completableFuture2;
    }

    private void executeRounds(ServiceContext serviceContext, PullPhysicalPlan pullPhysicalPlan, ConfiguredStatement<Query> configuredStatement, RoutingOptions routingOptions, List<Locator.KsqlPartitionLocation> list, PullQueryWriteStream pullQueryWriteStream, CompletableFuture<Void> completableFuture) throws InterruptedException {
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.routerExecutorService);
        int size = list.size();
        int i = 0;
        HashMap hashMap = new HashMap();
        for (Locator.KsqlPartitionLocation ksqlPartitionLocation : list) {
            Locator.KsqlNode nodeForRound = getNodeForRound(ksqlPartitionLocation, routingOptions);
            this.pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> {
                pullQueryExecutorMetrics.recordPartitionFetchRequest(1.0d);
            });
            executorCompletionService.submit(() -> {
                return executeOrRouteQuery(nodeForRound, ksqlPartitionLocation, configuredStatement, serviceContext, routingOptions, this.pullQueryMetrics, pullPhysicalPlan, pullQueryWriteStream, completableFuture);
            });
        }
        while (i < size) {
            try {
                PartitionFetchResult partitionFetchResult = (PartitionFetchResult) executorCompletionService.take().get();
                if (partitionFetchResult.isError()) {
                    ((List) hashMap.computeIfAbsent(Integer.valueOf(partitionFetchResult.location.getPartition()), num -> {
                        return new ArrayList();
                    })).add(partitionFetchResult.exception.get());
                    Locator.KsqlPartitionLocation nextNode = nextNode(partitionFetchResult.getLocation());
                    Locator.KsqlNode nodeForRound2 = getNodeForRound(nextNode, routingOptions);
                    this.pullQueryMetrics.ifPresent(pullQueryExecutorMetrics2 -> {
                        pullQueryExecutorMetrics2.recordResubmissionRequest(1.0d);
                    });
                    executorCompletionService.submit(() -> {
                        return executeOrRouteQuery(nodeForRound2, nextNode, configuredStatement, serviceContext, routingOptions, this.pullQueryMetrics, pullPhysicalPlan, pullQueryWriteStream, completableFuture);
                    });
                } else {
                    Preconditions.checkState(partitionFetchResult.getResult() == RoutingResult.SUCCESS);
                    i++;
                }
            } catch (Exception e) {
                MaterializationException materializationException = new MaterializationException("Unable to execute pull query: " + e.getMessage());
                Iterator it = hashMap.entrySet().iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((List) ((Map.Entry) it.next()).getValue()).iterator();
                    while (it2.hasNext()) {
                        materializationException.addSuppressed((Exception) it2.next());
                    }
                }
                throw materializationException;
            }
        }
        pullQueryWriteStream.close();
    }

    private Locator.KsqlPartitionLocation nextNode(Locator.KsqlPartitionLocation ksqlPartitionLocation) {
        return ksqlPartitionLocation.removeHeadHost();
    }

    private Locator.KsqlNode getNodeForRound(Locator.KsqlPartitionLocation ksqlPartitionLocation, RoutingOptions routingOptions) {
        if (!ksqlPartitionLocation.getNodes().isEmpty()) {
            return (Locator.KsqlNode) ksqlPartitionLocation.getNodes().get(0);
        }
        if (routingOptions.getIsSkipForwardRequest()) {
            throw new MaterializationException("Unable to execute pull query.");
        }
        throw new MaterializationException("Exhausted standby hosts to try.");
    }

    @VisibleForTesting
    static PartitionFetchResult executeOrRouteQuery(Locator.KsqlNode ksqlNode, Locator.KsqlPartitionLocation ksqlPartitionLocation, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext, RoutingOptions routingOptions, Optional<PullQueryExecutorMetrics> optional, PullPhysicalPlan pullPhysicalPlan, PullQueryWriteStream pullQueryWriteStream, CompletableFuture<Void> completableFuture) {
        PartitionFetchResult partitionFetchResult;
        Function<StreamedRow, StreamedRow> function = streamedRow -> {
            return streamedRow.withSourceHost(routingOptions.getIsDebugRequest() ? toKsqlHostInfo(ksqlNode) : null);
        };
        if (!ksqlNode.isLocal()) {
            try {
                LOG.debug("Query {} partition {} routed to host {} at timestamp {}.", new Object[]{pullPhysicalPlan.getQueryId(), Integer.valueOf(ksqlPartitionLocation.getPartition()), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
                optional.ifPresent(pullQueryExecutorMetrics -> {
                    pullQueryExecutorMetrics.recordRemoteRequests(1.0d);
                });
                forwardTo(ksqlNode, ImmutableList.of(ksqlPartitionLocation), configuredStatement, serviceContext, pullQueryWriteStream, completableFuture, function);
                return new PartitionFetchResult(RoutingResult.SUCCESS, ksqlPartitionLocation, Optional.empty());
            } catch (StandbyFallbackException e) {
                LOG.warn("Error forwarding query to node {}. Falling back to standby state which may return stale results", ksqlNode.location(), e.getCause());
                return new PartitionFetchResult(RoutingResult.STANDBY_FALLBACK, ksqlPartitionLocation, Optional.of(e));
            } catch (Exception e2) {
                throw new KsqlException(String.format("Error forwarding query to node %s: %s", ksqlNode.location(), e2.getMessage()), e2);
            }
        }
        try {
            try {
                LOG.debug("Query {} partition {} executed locally at host {} at timestamp {}.", new Object[]{pullPhysicalPlan.getQueryId(), Integer.valueOf(ksqlPartitionLocation.getPartition()), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
                optional.ifPresent(pullQueryExecutorMetrics2 -> {
                    pullQueryExecutorMetrics2.recordLocalRequests(1.0d);
                });
                synchronized (pullPhysicalPlan) {
                    pullPhysicalPlan.execute(ImmutableList.of(ksqlPartitionLocation), pullQueryWriteStream, function);
                    partitionFetchResult = new PartitionFetchResult(RoutingResult.SUCCESS, ksqlPartitionLocation, Optional.empty());
                }
                return partitionFetchResult;
            } catch (StandbyFallbackException | NotUpToBoundException e3) {
                LOG.warn("Error executing query locally at node {}. Falling back to standby state which may return stale results. Cause {}", ksqlNode, e3.getMessage());
                return new PartitionFetchResult(RoutingResult.STANDBY_FALLBACK, ksqlPartitionLocation, Optional.of(e3));
            }
        } catch (Exception e4) {
            throw new KsqlException(String.format("Error executing query locally at node %s: %s", ksqlNode.location(), e4.getMessage()), e4);
        }
    }

    private static KsqlHostInfoEntity toKsqlHostInfo(Locator.KsqlNode ksqlNode) {
        return new KsqlHostInfoEntity(ksqlNode.location().getHost(), ksqlNode.location().getPort());
    }

    private static void forwardTo(Locator.KsqlNode ksqlNode, List<Locator.KsqlPartitionLocation> list, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext, PullQueryWriteStream pullQueryWriteStream, CompletableFuture<Void> completableFuture, Function<StreamedRow, StreamedRow> function) {
        ImmutableMap build = new ImmutableMap.Builder().put("request.ksql.query.pull.skip.forwarding", true).put("request.ksql.internal.request", true).put("request.ksql.query.pull.partition", (String) list.stream().map(ksqlPartitionLocation -> {
            return Integer.toString(ksqlPartitionLocation.getPartition());
        }).collect(Collectors.joining(","))).build();
        try {
            RestResponse makeQueryRequest = serviceContext.getKsqlClient().makeQueryRequest(ksqlNode.location(), configuredStatement.getUnMaskedStatementText(), configuredStatement.getSessionConfig().getOverrides(), build, pullQueryWriteStream, completableFuture, function);
            if (makeQueryRequest.isErroneous()) {
                throw new KsqlException(String.format("Forwarding pull query request [%s, %s] failed with error %s ", configuredStatement.getSessionConfig().getOverrides(), build, makeQueryRequest.getErrorMessage()));
            }
            if (((Integer) makeQueryRequest.getResponse()).intValue() == 0) {
                throw new KsqlException(String.format("Forwarding pull query request [%s, %s] failed due to invalid empty response from forwarding call, expected a header row.", configuredStatement.getSessionConfig().getOverrides(), build));
            }
        } catch (Exception e) {
            if (completableFuture.isDone()) {
                LOG.warn("Connection canceled, so returning");
            } else {
                KsqlException causedByKsqlException = causedByKsqlException(e);
                throw new StandbyFallbackException(String.format("Forwarding pull query request failed with error %s ", causedByKsqlException == null ? e.getMessage() : causedByKsqlException.getMessage()), e);
            }
        }
    }

    private static KsqlException causedByKsqlException(Exception exc) {
        Throwable th = exc;
        while (true) {
            Throwable th2 = th;
            if (th2 == null) {
                return null;
            }
            if (th2 instanceof KsqlException) {
                return (KsqlException) th2;
            }
            th = th2.getCause();
        }
    }
}
