package io.confluent.ksql.physical.scalablepush;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.scalablepush.locator.PushLocator;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlException;
import io.vertx.core.Context;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/physical/scalablepush/PushRouting.class */
public class PushRouting implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(HARouting.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/physical/scalablepush/PushRouting$LocalQueryStreamSubscriber.class */
    public static class LocalQueryStreamSubscriber extends BaseSubscriber<List<?>> {
        private final TransientQueryQueue transientQueryQueue;
        private final Consumer<Throwable> errorCallback;
        private boolean closed;

        LocalQueryStreamSubscriber(Context context, TransientQueryQueue transientQueryQueue, Consumer<Throwable> consumer) {
            super(context);
            this.transientQueryQueue = transientQueryQueue;
            this.errorCallback = consumer;
        }

        protected void afterSubscribe(Subscription subscription) {
            makeRequest(1L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public synchronized void handleValue(List<?> list) {
            if (this.closed) {
                return;
            }
            if (this.transientQueryQueue.acceptRowNonBlocking(null, GenericRow.fromList(list))) {
                makeRequest(1L);
            } else {
                this.errorCallback.accept(new KsqlException("Hit limit of request queue"));
                close();
            }
        }

        protected void handleComplete() {
        }

        protected void handleError(Throwable th) {
            this.errorCallback.accept(th);
        }

        synchronized void close() {
            this.closed = true;
            this.context.runOnContext(r3 -> {
                cancel();
            });
        }
    }

    /* loaded from: input_file:io/confluent/ksql/physical/scalablepush/PushRouting$PushConnectionsHandle.class */
    public static class PushConnectionsHandle {
        private final Map<PushLocator.KsqlNode, RoutingResult> results = new ConcurrentHashMap();
        private final CompletableFuture<Void> errorCallback;

        @SuppressFBWarnings({"EI_EXPOSE_REP"})
        public PushConnectionsHandle(CompletableFuture<Void> completableFuture) {
            this.errorCallback = completableFuture;
            completableFuture.exceptionally(th -> {
                close();
                return null;
            });
        }

        public void add(PushLocator.KsqlNode ksqlNode, RoutingResult routingResult) {
            this.results.put(ksqlNode, routingResult);
        }

        public void remove(PushLocator.KsqlNode ksqlNode) {
            this.results.remove(ksqlNode);
        }

        public void close() {
            Iterator<RoutingResult> it = this.results.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }

        public void onException(Consumer<Throwable> consumer) {
            this.errorCallback.exceptionally(th -> {
                consumer.accept(th);
                return null;
            });
        }

        public void completeExceptionally(Throwable th) {
            if (this.errorCallback.isDone()) {
                return;
            }
            this.errorCallback.completeExceptionally(th);
        }

        public Throwable getError() throws InterruptedException {
            try {
                this.errorCallback.get();
                return null;
            } catch (ExecutionException e) {
                return e.getCause();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/physical/scalablepush/PushRouting$RemoteStreamSubscriber.class */
    public static class RemoteStreamSubscriber extends BaseSubscriber<StreamedRow> {
        private final TransientQueryQueue transientQueryQueue;
        private final Consumer<Throwable> errorCallback;
        private boolean closed;

        RemoteStreamSubscriber(Context context, TransientQueryQueue transientQueryQueue, Consumer<Throwable> consumer) {
            super(context);
            this.transientQueryQueue = transientQueryQueue;
            this.errorCallback = consumer;
        }

        protected void afterSubscribe(Subscription subscription) {
            makeRequest(1L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public synchronized void handleValue(StreamedRow streamedRow) {
            if (this.closed) {
                return;
            }
            if (streamedRow.isTerminal()) {
                close();
            } else if (!streamedRow.getRow().isPresent() || this.transientQueryQueue.acceptRowNonBlocking(null, GenericRow.fromList(((StreamedRow.DataRow) streamedRow.getRow().get()).getColumns()))) {
                makeRequest(1L);
            } else {
                this.errorCallback.accept(new KsqlException("Hit limit of request queue"));
                close();
            }
        }

        protected void handleComplete() {
        }

        protected void handleError(Throwable th) {
            this.errorCallback.accept(th);
        }

        synchronized void close() {
            this.closed = true;
            this.context.runOnContext(r3 -> {
                cancel();
            });
        }
    }

    /* loaded from: input_file:io/confluent/ksql/physical/scalablepush/PushRouting$RoutingResult.class */
    public static class RoutingResult {
        private final RoutingResultStatus status;
        private final AutoCloseable closeable;

        public RoutingResult(RoutingResultStatus routingResultStatus, AutoCloseable autoCloseable) {
            this.status = routingResultStatus;
            this.closeable = autoCloseable;
        }

        public void close() {
            try {
                this.closeable.close();
            } catch (Exception e) {
                PushRouting.LOG.error("Error closing routing result: " + e.getMessage(), e);
            }
        }

        public RoutingResultStatus getStatus() {
            return this.status;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/physical/scalablepush/PushRouting$RoutingResultStatus.class */
    public enum RoutingResultStatus {
        SUCCESS
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }

    public CompletableFuture<PushConnectionsHandle> handlePushQuery(ServiceContext serviceContext, PushPhysicalPlan pushPhysicalPlan, ConfiguredStatement<Query> configuredStatement, PushRoutingOptions pushRoutingOptions, LogicalSchema logicalSchema, TransientQueryQueue transientQueryQueue) {
        List<PushLocator.KsqlNode> list = (List) pushPhysicalPlan.getScalablePushRegistry().getLocator().locate().stream().filter(ksqlNode -> {
            return !pushRoutingOptions.getIsSkipForwardRequest() || ksqlNode.isLocal();
        }).distinct().collect(Collectors.toList());
        if (!list.isEmpty()) {
            return connectToHosts(serviceContext, pushPhysicalPlan, configuredStatement, list, logicalSchema, transientQueryQueue);
        }
        LOG.error("Unable to execute push query: {}. No nodes executing persistent queries", configuredStatement.getStatementText());
        throw new KsqlException(String.format("Unable to execute push query. No nodes executing persistent queries %s", configuredStatement.getStatementText()));
    }

    private CompletableFuture<PushConnectionsHandle> connectToHosts(ServiceContext serviceContext, PushPhysicalPlan pushPhysicalPlan, ConfiguredStatement<Query> configuredStatement, List<PushLocator.KsqlNode> list, LogicalSchema logicalSchema, TransientQueryQueue transientQueryQueue) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        CompletableFuture completableFuture = new CompletableFuture();
        for (PushLocator.KsqlNode ksqlNode : list) {
            completableFuture.getClass();
            linkedHashMap.put(ksqlNode, executeOrRouteQuery(ksqlNode, configuredStatement, serviceContext, pushPhysicalPlan, logicalSchema, transientQueryQueue, completableFuture::completeExceptionally));
        }
        PushConnectionsHandle pushConnectionsHandle = new PushConnectionsHandle(completableFuture);
        return CompletableFuture.allOf((CompletableFuture[]) linkedHashMap.values().toArray(new CompletableFuture[0])).thenApply(r7 -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                PushLocator.KsqlNode ksqlNode2 = (PushLocator.KsqlNode) it.next();
                pushConnectionsHandle.add(ksqlNode2, (RoutingResult) ((CompletableFuture) linkedHashMap.get(ksqlNode2)).join());
            }
            return pushConnectionsHandle;
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            LOG.warn("Error routing query {} to host {} at timestamp {} with exception {}", new Object[]{configuredStatement.getStatementText(), (PushLocator.KsqlNode) linkedHashMap.entrySet().stream().filter(entry -> {
                return ((CompletableFuture) entry.getValue()).isCompletedExceptionally();
            }).map((v0) -> {
                return v0.getKey();
            }).findFirst().orElse(null), Long.valueOf(System.currentTimeMillis()), th.getCause()});
            pushConnectionsHandle.close();
            pushConnectionsHandle.completeExceptionally(new KsqlException(String.format("Unable to execute push query \"%s\". %s", configuredStatement.getStatementText(), th.getCause().getMessage())));
            return pushConnectionsHandle;
        });
    }

    @VisibleForTesting
    static CompletableFuture<RoutingResult> executeOrRouteQuery(PushLocator.KsqlNode ksqlNode, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext, PushPhysicalPlan pushPhysicalPlan, LogicalSchema logicalSchema, TransientQueryQueue transientQueryQueue, Consumer<Throwable> consumer) {
        if (ksqlNode.isLocal()) {
            LOG.debug("Query {} executed locally at host {} at timestamp {}.", new Object[]{configuredStatement.getStatementText(), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
            AtomicReference atomicReference = new AtomicReference(null);
            return CompletableFuture.completedFuture(null).thenApply(obj -> {
                return pushPhysicalPlan.execute();
            }).thenApply(bufferedPublisher -> {
                atomicReference.set(bufferedPublisher);
                bufferedPublisher.subscribe(new LocalQueryStreamSubscriber(bufferedPublisher.getContext(), transientQueryQueue, consumer));
                return new RoutingResult(RoutingResultStatus.SUCCESS, () -> {
                    pushPhysicalPlan.close();
                    bufferedPublisher.close();
                });
            }).exceptionally(th -> {
                LOG.error("Error executing query {} locally at node {}", new Object[]{configuredStatement.getStatementText(), ksqlNode.location(), th.getCause()});
                BufferedPublisher bufferedPublisher2 = (BufferedPublisher) atomicReference.get();
                pushPhysicalPlan.close();
                if (bufferedPublisher2 != null) {
                    bufferedPublisher2.close();
                }
                throw new KsqlException(String.format("Error executing query locally at node %s: %s", ksqlNode.location(), th.getMessage()), th);
            });
        }
        LOG.debug("Query {} routed to host {} at timestamp {}.", new Object[]{configuredStatement.getStatementText(), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
        AtomicReference atomicReference2 = new AtomicReference(null);
        return forwardTo(ksqlNode, configuredStatement, serviceContext, logicalSchema).thenApply(bufferedPublisher2 -> {
            atomicReference2.set(bufferedPublisher2);
            bufferedPublisher2.subscribe(new RemoteStreamSubscriber(bufferedPublisher2.getContext(), transientQueryQueue, consumer));
            RoutingResultStatus routingResultStatus = RoutingResultStatus.SUCCESS;
            bufferedPublisher2.getClass();
            return new RoutingResult(routingResultStatus, bufferedPublisher2::close);
        }).exceptionally((Function<Throwable, ? extends U>) th2 -> {
            LOG.error("Error forwarding query {} to node {}", new Object[]{configuredStatement.getStatementText(), ksqlNode, th2.getCause()});
            BufferedPublisher bufferedPublisher3 = (BufferedPublisher) atomicReference2.get();
            if (bufferedPublisher3 != null) {
                bufferedPublisher3.close();
            }
            throw new KsqlException(String.format("Error forwarding query to node %s: %s", ksqlNode.location(), th2.getMessage()), th2);
        });
    }

    private static CompletableFuture<BufferedPublisher<StreamedRow>> forwardTo(PushLocator.KsqlNode ksqlNode, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext, LogicalSchema logicalSchema) {
        ImmutableMap of = ImmutableMap.of("request.ksql.query.push.skip.forwarding", true, "request.ksql.internal.request", true);
        return serviceContext.getKsqlClient().makeQueryRequestStreamed(ksqlNode.location(), configuredStatement.getStatementText(), configuredStatement.getSessionConfig().getOverrides(), of).thenApply(restResponse -> {
            if (restResponse.isErroneous()) {
                throw new KsqlException(String.format("Forwarding pull query request [%s, %s] failed with error %s ", configuredStatement.getSessionConfig().getOverrides(), of, restResponse.getErrorMessage()));
            }
            return (BufferedPublisher) restResponse.getResponse();
        });
    }
}
