package io.confluent.ksql.physical.pull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.execution.streams.materialization.Locator;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.query.PullQueryQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/physical/pull/HARouting.class */
public final class HARouting implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(HARouting.class);
    private final ExecutorService executorService;
    private final RoutingFilter.RoutingFilterFactory routingFilterFactory;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
    private final RouteQuery routeQuery;

    @VisibleForTesting
    /* loaded from: input_file:io/confluent/ksql/physical/pull/HARouting$RouteQuery.class */
    interface RouteQuery {
        void routeQuery(Locator.KsqlNode ksqlNode, List<Locator.KsqlPartitionLocation> list, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext, RoutingOptions routingOptions, Optional<PullQueryExecutorMetrics> optional, PullPhysicalPlan pullPhysicalPlan, LogicalSchema logicalSchema, QueryId queryId, PullQueryQueue pullQueryQueue);
    }

    public HARouting(RoutingFilter.RoutingFilterFactory routingFilterFactory, Optional<PullQueryExecutorMetrics> optional, KsqlConfig ksqlConfig) {
        this(routingFilterFactory, optional, ksqlConfig, HARouting::executeOrRouteQuery);
    }

    @VisibleForTesting
    HARouting(RoutingFilter.RoutingFilterFactory routingFilterFactory, Optional<PullQueryExecutorMetrics> optional, KsqlConfig ksqlConfig, RouteQuery routeQuery) {
        this.routingFilterFactory = (RoutingFilter.RoutingFilterFactory) Objects.requireNonNull(routingFilterFactory, "routingFilterFactory");
        this.executorService = Executors.newFixedThreadPool(ksqlConfig.getInt("ksql.query.pull.thread.pool.size").intValue(), new ThreadFactoryBuilder().setNameFormat("pull-query-executor-%d").build());
        this.pullQueryMetrics = (Optional) Objects.requireNonNull(optional, "pullQueryMetrics");
        this.routeQuery = (RouteQuery) Objects.requireNonNull(routeQuery);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.executorService.shutdown();
    }

    public CompletableFuture<Void> handlePullQuery(ServiceContext serviceContext, PullPhysicalPlan pullPhysicalPlan, ConfiguredStatement<Query> configuredStatement, RoutingOptions routingOptions, LogicalSchema logicalSchema, QueryId queryId, PullQueryQueue pullQueryQueue) {
        List locate = pullPhysicalPlan.getMaterialization().locator().locate(pullPhysicalPlan.getKeys(), routingOptions, this.routingFilterFactory);
        if (locate.stream().anyMatch(ksqlPartitionLocation -> {
            return ksqlPartitionLocation.getNodes().isEmpty();
        })) {
            LOG.debug("Unable to execute pull query: {}. All nodes are dead or exceed max allowed lag.", configuredStatement.getStatementText());
            throw new MaterializationException(String.format("Unable to execute pull query %s. All nodes are dead or exceed max allowed lag.", configuredStatement.getStatementText()));
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.executorService.submit(() -> {
            try {
                executeRounds(serviceContext, pullPhysicalPlan, configuredStatement, routingOptions, logicalSchema, queryId, locate, pullQueryQueue);
                completableFuture.complete(null);
            } catch (Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    private void executeRounds(ServiceContext serviceContext, PullPhysicalPlan pullPhysicalPlan, ConfiguredStatement<Query> configuredStatement, RoutingOptions routingOptions, LogicalSchema logicalSchema, QueryId queryId, List<Locator.KsqlPartitionLocation> list, PullQueryQueue pullQueryQueue) throws InterruptedException {
        List copyOf = ImmutableList.copyOf(list);
        int i = 0;
        while (true) {
            Map<Locator.KsqlNode, List<Locator.KsqlPartitionLocation>> groupByHost = groupByHost(configuredStatement, copyOf, i);
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            for (Map.Entry<Locator.KsqlNode, List<Locator.KsqlPartitionLocation>> entry : groupByHost.entrySet()) {
                Locator.KsqlNode key = entry.getKey();
                linkedHashMap.put(key, this.executorService.submit(() -> {
                    this.routeQuery.routeQuery(key, (List) entry.getValue(), configuredStatement, serviceContext, routingOptions, this.pullQueryMetrics, pullPhysicalPlan, logicalSchema, queryId, pullQueryQueue);
                    return null;
                }));
            }
            ImmutableList.Builder builder = ImmutableList.builder();
            for (Map.Entry entry2 : linkedHashMap.entrySet()) {
                Future future = (Future) entry2.getValue();
                Locator.KsqlNode ksqlNode = (Locator.KsqlNode) entry2.getKey();
                try {
                    future.get();
                } catch (ExecutionException e) {
                    LOG.warn("Error routing query {} to host {} at timestamp {} with exception {}", new Object[]{configuredStatement.getStatementText(), ksqlNode, Long.valueOf(System.currentTimeMillis()), e.getCause()});
                    builder.addAll(groupByHost.get(ksqlNode));
                }
            }
            copyOf = builder.build();
            if (copyOf.size() == 0) {
                pullQueryQueue.close();
                return;
            }
            i++;
        }
    }

    private static Map<Locator.KsqlNode, List<Locator.KsqlPartitionLocation>> groupByHost(ConfiguredStatement<Query> configuredStatement, List<Locator.KsqlPartitionLocation> list, int i) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Locator.KsqlPartitionLocation ksqlPartitionLocation : list) {
            if (i >= ksqlPartitionLocation.getNodes().size()) {
                throw new MaterializationException(String.format("Unable to execute pull query: %s. Exhausted standby hosts to try.", configuredStatement.getStatementText()));
            }
            ((List) linkedHashMap.computeIfAbsent((Locator.KsqlNode) ksqlPartitionLocation.getNodes().get(i), ksqlNode -> {
                return new ArrayList();
            })).add(ksqlPartitionLocation);
        }
        return linkedHashMap;
    }

    @VisibleForTesting
    static void executeOrRouteQuery(Locator.KsqlNode ksqlNode, List<Locator.KsqlPartitionLocation> list, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext, RoutingOptions routingOptions, Optional<PullQueryExecutorMetrics> optional, PullPhysicalPlan pullPhysicalPlan, LogicalSchema logicalSchema, QueryId queryId, PullQueryQueue pullQueryQueue) {
        BiFunction<List<?>, LogicalSchema, PullQueryRow> biFunction = (list2, logicalSchema2) -> {
            return new PullQueryRow(list2, logicalSchema2, Optional.ofNullable(routingOptions.getIsDebugRequest() ? ksqlNode : null));
        };
        if (ksqlNode.isLocal()) {
            try {
                LOG.debug("Query {} executed locally at host {} at timestamp {}.", new Object[]{configuredStatement.getStatementText(), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
                optional.ifPresent(pullQueryExecutorMetrics -> {
                    pullQueryExecutorMetrics.recordLocalRequests(1.0d);
                });
                pullPhysicalPlan.execute(list, pullQueryQueue, biFunction);
                return;
            } catch (Exception e) {
                LOG.error("Error executing query {} locally at node {} with exception", new Object[]{configuredStatement.getStatementText(), ksqlNode, e.getCause()});
                throw new KsqlException(String.format("Error executing query %s locally at node %s", configuredStatement.getStatementText(), ksqlNode), e);
            }
        }
        try {
            LOG.debug("Query {} routed to host {} at timestamp {}.", new Object[]{configuredStatement.getStatementText(), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
            optional.ifPresent(pullQueryExecutorMetrics2 -> {
                pullQueryExecutorMetrics2.recordRemoteRequests(1.0d);
            });
            forwardTo(ksqlNode, list, configuredStatement, serviceContext, pullQueryQueue, biFunction, logicalSchema);
        } catch (Exception e2) {
            LOG.error("Error forwarding query {} to node {} with exception {}", new Object[]{configuredStatement.getStatementText(), ksqlNode, e2.getCause()});
            throw new KsqlException(String.format("Error forwarding query %s to node %s", configuredStatement.getStatementText(), ksqlNode), e2);
        }
    }

    private static void forwardTo(Locator.KsqlNode ksqlNode, List<Locator.KsqlPartitionLocation> list, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext, PullQueryQueue pullQueryQueue, BiFunction<List<?>, LogicalSchema, PullQueryRow> biFunction, LogicalSchema logicalSchema) {
        ImmutableMap of = ImmutableMap.of("request.ksql.query.pull.skip.forwarding", true, "request.ksql.internal.request", true, "request.ksql.query.pull.partition", (String) list.stream().map(ksqlPartitionLocation -> {
            return Integer.toString(ksqlPartitionLocation.getPartition());
        }).collect(Collectors.joining(",")));
        RestResponse makeQueryRequest = serviceContext.getKsqlClient().makeQueryRequest(ksqlNode.location(), configuredStatement.getStatementText(), configuredStatement.getSessionConfig().getOverrides(), of, streamedRowsHandler(ksqlNode, configuredStatement, of, pullQueryQueue, biFunction, logicalSchema));
        if (makeQueryRequest.isErroneous()) {
            throw new KsqlServerException(String.format("Forwarding pull query request [%s, %s, %s] to node %s failed with error %s ", configuredStatement.getStatement(), configuredStatement.getSessionConfig().getOverrides(), of, ksqlNode, makeQueryRequest.getErrorMessage()));
        }
        if (((Integer) makeQueryRequest.getResponse()).intValue() == 0) {
            throw new KsqlServerException(String.format("Forwarding pull query request [%s, %s, %s] to node %s failed due to invalid empty response from forwarding call, expected a header row.", configuredStatement.getStatement(), configuredStatement.getSessionConfig().getOverrides(), of, ksqlNode));
        }
    }

    private static Consumer<List<StreamedRow>> streamedRowsHandler(Locator.KsqlNode ksqlNode, ConfiguredStatement<Query> configuredStatement, Map<String, Object> map, PullQueryQueue pullQueryQueue, BiFunction<List<?>, LogicalSchema, PullQueryRow> biFunction, LogicalSchema logicalSchema) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicReference atomicReference = new AtomicReference();
        return list -> {
            if (list == null || list.isEmpty()) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            int andAdd = atomicInteger.getAndAdd(list.size());
            for (int i = 0; i < list.size(); i++) {
                StreamedRow streamedRow = (StreamedRow) list.get(i);
                if (i == 0 && andAdd == 0) {
                    Optional header = streamedRow.getHeader();
                    header.ifPresent(header2 -> {
                        validateSchema(logicalSchema, header2.getSchema(), ksqlNode);
                    });
                    atomicReference.getClass();
                    header.ifPresent((v1) -> {
                        r1.set(v1);
                    });
                } else {
                    if (streamedRow.getErrorMessage().isPresent()) {
                        throw new KsqlStatementException(((KsqlErrorMessage) streamedRow.getErrorMessage().get()).getMessage(), configuredStatement.getStatementText());
                    }
                    if (!streamedRow.getRow().isPresent()) {
                        throw new KsqlServerException(String.format("Forwarding pull query request [%s, %s, %s] to node %s failed due to missing row data.", configuredStatement.getStatement(), configuredStatement.getSessionConfig().getOverrides(), map, ksqlNode));
                    }
                    List columns = ((StreamedRow.DataRow) streamedRow.getRow().get()).getColumns();
                    Preconditions.checkNotNull(atomicReference.get());
                    arrayList.add(biFunction.apply(columns, ((StreamedRow.Header) atomicReference.get()).getSchema()));
                }
            }
            if (pullQueryQueue.acceptRows(arrayList)) {
                return;
            }
            LOG.info("Failed to queue all rows");
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void validateSchema(LogicalSchema logicalSchema, LogicalSchema logicalSchema2, Locator.KsqlNode ksqlNode) {
        if (!logicalSchema2.equals(logicalSchema)) {
            throw new KsqlException(String.format("Schemas %s from host %s differs from schema %s", logicalSchema2, ksqlNode, logicalSchema));
        }
    }
}
