package io.confluent.ksql.execution.scalablepush;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.common.QueryRow;
import io.confluent.ksql.execution.scalablepush.locator.PushLocator;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.PushContinuationToken;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KeyValue;
import io.confluent.ksql.util.KeyValueMetadata;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.OffsetVector;
import io.confluent.ksql.util.PushOffsetRange;
import io.confluent.ksql.util.PushOffsetVector;
import io.confluent.ksql.util.RowMetadata;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/scalablepush/PushRouting.class */
public class PushRouting implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(PushRouting.class);
    private static final long CLUSTER_CHECK_INTERVAL_MS = 1000;
    private static final long HOST_CACHE_EXPIRATION_MS = 1000;
    private final Function<ScalablePushRegistry, Set<PushLocator.KsqlNode>> registryToNodes;
    private final long clusterCheckInterval;
    private final boolean backgroundRetries;

    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/PushRouting$GapFoundException.class */
    public static class GapFoundException extends RuntimeException {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/PushRouting$LocalQueryStreamSubscriber.class */
    public static class LocalQueryStreamSubscriber extends StreamSubscriber<QueryRow> {
        LocalQueryStreamSubscriber(Context context, TransientQueryQueue transientQueryQueue, CompletableFuture<Void> completableFuture, PushLocator.KsqlNode ksqlNode, QueryId queryId, OffsetsTracker offsetsTracker, PushRoutingOptions pushRoutingOptions, String str) {
            super(context, transientQueryQueue, completableFuture, ksqlNode, queryId, offsetsTracker, pushRoutingOptions, str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public synchronized void handleValue(QueryRow queryRow) {
            if (this.closed) {
                return;
            }
            Optional<PushOffsetRange> handleContinuationToken = handleContinuationToken(queryRow.getOffsetRange(), !this.pushRoutingOptions.getHasBeenForwarded(), this.pushRoutingOptions.alosEnabled());
            if (handleContinuationToken.isPresent() || !queryRow.getOffsetRange().isPresent()) {
                if (!handleContinuationToken.isPresent() || this.pushRoutingOptions.shouldOutputContinuationToken()) {
                    Optional<U> map = handleContinuationToken.map(RowMetadata::of);
                    if (!this.transientQueryQueue.acceptRowNonBlocking(map.isPresent() ? new KeyValueMetadata<>((RowMetadata) map.get()) : new KeyValueMetadata<>(new KeyValue((Object) null, queryRow.value())))) {
                        this.callback.completeExceptionally(new KsqlException("Hit limit of request queue"));
                        close();
                        return;
                    }
                } else {
                    PushRouting.LOG.debug("Not outputting continuation token " + handleContinuationToken.get());
                }
                makeRequest(1L);
            }
        }

        protected void handleError(Throwable th) {
            PushRouting.LOG.error("Received error from remote node {} for id {}: {}", new Object[]{this.node, this.queryId, th.getMessage(), th});
            this.callback.completeExceptionally(th);
            close();
        }

        @Override // io.confluent.ksql.execution.scalablepush.PushRouting.StreamSubscriber
        public String name() {
            return "Local";
        }
    }

    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/PushRouting$OffsetsTracker.class */
    public static class OffsetsTracker {
        private final PushOffsetVector currentOffsets = new PushOffsetVector();

        public PushOffsetVector getOffsets() {
            return this.currentOffsets;
        }

        public PushOffsetRange getOffsetRange() {
            return new PushOffsetRange(Optional.empty(), this.currentOffsets);
        }

        public String getSerializedOffsetRange() {
            return getOffsetRange().serialize();
        }

        public void updateFromToken(OffsetVector offsetVector) {
            this.currentOffsets.merge(offsetVector);
        }
    }

    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/PushRouting$PushConnectionsHandle.class */
    public static class PushConnectionsHandle {
        private final Map<PushLocator.KsqlNode, RoutingResult> results = new ConcurrentHashMap();
        private volatile boolean closed = false;
        private final OffsetsTracker offsetsTracker = new OffsetsTracker();
        private final CompletableFuture<Void> callback = new CompletableFuture<>();

        @SuppressFBWarnings({"EI_EXPOSE_REP"})
        public PushConnectionsHandle() {
            this.callback.exceptionally(th -> {
                close();
                return null;
            });
        }

        public void add(PushLocator.KsqlNode ksqlNode, RoutingResult routingResult) {
            this.results.put(ksqlNode, routingResult);
            if (isClosed()) {
                routingResult.close();
            }
        }

        public RoutingResult remove(PushLocator.KsqlNode ksqlNode) {
            return this.results.remove(ksqlNode);
        }

        public Optional<RoutingResult> get(PushLocator.KsqlNode ksqlNode) {
            return Optional.ofNullable(this.results.getOrDefault(ksqlNode, null));
        }

        public void close() {
            this.closed = true;
            Iterator<Map.Entry<PushLocator.KsqlNode, RoutingResult>> it = this.results.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().close();
            }
            this.callback.complete(null);
        }

        public Set<PushLocator.KsqlNode> getAllHosts() {
            return ImmutableSet.copyOf(this.results.keySet());
        }

        public Set<PushLocator.KsqlNode> getActiveHosts() {
            return (Set) this.results.entrySet().stream().filter(entry -> {
                return RoutingResultStatus.isHostActive(((RoutingResult) entry.getValue()).getStatus());
            }).map((v0) -> {
                return v0.getKey();
            }).collect(ImmutableSet.toImmutableSet());
        }

        public boolean isClosed() {
            return this.closed || this.callback.isDone();
        }

        public void onException(Consumer<Throwable> consumer) {
            this.callback.exceptionally(th -> {
                consumer.accept(th);
                return null;
            });
        }

        public void completeExceptionally(Throwable th) {
            if (this.callback.isDone()) {
                return;
            }
            this.callback.completeExceptionally(th);
            close();
        }

        public Throwable getError() throws InterruptedException {
            try {
                this.callback.get();
                return null;
            } catch (ExecutionException e) {
                return e.getCause();
            }
        }

        public void onCompletionOrException(BiConsumer<Void, Throwable> biConsumer) {
            this.callback.handle((r5, th) -> {
                biConsumer.accept(r5, th);
                return null;
            });
        }

        public OffsetsTracker getOffsetsTracker() {
            return this.offsetsTracker;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/PushRouting$RemoteStreamSubscriber.class */
    public static class RemoteStreamSubscriber extends StreamSubscriber<StreamedRow> {
        RemoteStreamSubscriber(Context context, TransientQueryQueue transientQueryQueue, CompletableFuture<Void> completableFuture, PushLocator.KsqlNode ksqlNode, QueryId queryId, OffsetsTracker offsetsTracker, PushRoutingOptions pushRoutingOptions, String str) {
            super(context, transientQueryQueue, completableFuture, ksqlNode, queryId, offsetsTracker, pushRoutingOptions, str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public synchronized void handleValue(StreamedRow streamedRow) {
            if (this.closed) {
                return;
            }
            if (streamedRow.getFinalMessage().isPresent()) {
                close();
                return;
            }
            if (streamedRow.getRow().isPresent() || streamedRow.getContinuationToken().isPresent()) {
                if (!handleQueueableRow(streamedRow.getRow(), streamedRow.getContinuationToken())) {
                    PushRouting.LOG.warn("Unable to handle queueable row");
                    return;
                }
            } else if (streamedRow.getErrorMessage().isPresent()) {
                KsqlErrorMessage ksqlErrorMessage = (KsqlErrorMessage) streamedRow.getErrorMessage().get();
                PushRouting.LOG.error("Received error from remote node {} and id {}: {}", new Object[]{this.node, this.queryId, ksqlErrorMessage});
                this.callback.completeExceptionally(new KsqlException("Remote server had an error: " + ksqlErrorMessage.getErrorCode() + " - " + ksqlErrorMessage.getMessage()));
                close();
                return;
            }
            makeRequest(1L);
        }

        private boolean handleQueueableRow(Optional<StreamedRow.DataRow> optional, Optional<PushContinuationToken> optional2) {
            Optional<PushOffsetRange> handleContinuationToken = handleContinuationToken(optional2.map(pushContinuationToken -> {
                return PushOffsetRange.deserialize(pushContinuationToken.getContinuationToken());
            }), !this.pushRoutingOptions.getHasBeenForwarded(), this.pushRoutingOptions.alosEnabled());
            if (optional2.isPresent() && !handleContinuationToken.isPresent()) {
                return false;
            }
            if (handleContinuationToken.isPresent() && !this.pushRoutingOptions.shouldOutputContinuationToken()) {
                PushRouting.LOG.debug("Not outputting continuation token " + handleContinuationToken.get());
                return true;
            }
            Optional<U> map = handleContinuationToken.map(RowMetadata::of);
            if (this.transientQueryQueue.acceptRowNonBlocking(map.isPresent() ? new KeyValueMetadata<>((RowMetadata) map.get()) : new KeyValueMetadata<>(new KeyValue((Object) null, GenericRow.fromList(optional.get().getColumns()))))) {
                return true;
            }
            this.callback.completeExceptionally(new KsqlException("Hit limit of request queue"));
            close();
            return false;
        }

        protected void handleError(Throwable th) {
            PushRouting.LOG.error("Received error from remote node {} for id {}: {}", new Object[]{this.node, this.queryId, th.getMessage(), th});
            PushRouting.LOG.info("Ignoring transient network error for node {} for id {}", this.node, this.queryId);
            close();
        }

        @Override // io.confluent.ksql.execution.scalablepush.PushRouting.StreamSubscriber
        public String name() {
            return "Remote";
        }
    }

    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/PushRouting$RoutingResult.class */
    public static class RoutingResult {
        private final AutoCloseable closeable;
        private volatile RoutingResultStatus status;

        public RoutingResult(RoutingResultStatus routingResultStatus, AutoCloseable autoCloseable) {
            this.status = routingResultStatus;
            this.closeable = autoCloseable;
        }

        public void close() {
            try {
                this.closeable.close();
            } catch (Exception e) {
                PushRouting.LOG.error("Error closing routing result: " + e.getMessage(), e);
            }
        }

        public RoutingResultStatus getStatus() {
            return this.status;
        }

        public void updateStatus(RoutingResultStatus routingResultStatus) {
            this.status = routingResultStatus;
        }

        public String toString() {
            return "RoutingResult{" + this.status + "}";
        }
    }

    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/PushRouting$RoutingResultStatus.class */
    public enum RoutingResultStatus {
        IN_PROGRESS,
        SUCCESS,
        COMPLETE,
        REMOVED,
        FAILED,
        OFFSET_GAP_FOUND;

        /* JADX INFO: Access modifiers changed from: package-private */
        public static boolean isHostActive(RoutingResultStatus routingResultStatus) {
            switch (routingResultStatus) {
                case IN_PROGRESS:
                case SUCCESS:
                    return true;
                default:
                    return false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/PushRouting$StreamSubscriber.class */
    public static abstract class StreamSubscriber<T> extends BaseSubscriber<T> {
        protected final TransientQueryQueue transientQueryQueue;
        protected final CompletableFuture<Void> callback;
        protected final PushLocator.KsqlNode node;
        protected final QueryId queryId;
        protected final OffsetsTracker offsetsTracker;
        protected final PushRoutingOptions pushRoutingOptions;
        protected final String thisHostName;
        protected boolean closed;

        StreamSubscriber(Context context, TransientQueryQueue transientQueryQueue, CompletableFuture<Void> completableFuture, PushLocator.KsqlNode ksqlNode, QueryId queryId, OffsetsTracker offsetsTracker, PushRoutingOptions pushRoutingOptions, String str) {
            super(context);
            this.transientQueryQueue = transientQueryQueue;
            this.callback = completableFuture;
            this.node = ksqlNode;
            this.queryId = queryId;
            this.offsetsTracker = offsetsTracker;
            this.pushRoutingOptions = pushRoutingOptions;
            this.thisHostName = str;
        }

        protected void afterSubscribe(Subscription subscription) {
            makeRequest(1L);
        }

        protected void handleComplete() {
            PushRouting.LOG.info("Received complete from remote node {} for id {}", this.node, this.queryId);
            close();
        }

        synchronized void close() {
            this.closed = true;
            this.callback.complete(null);
            this.context.runOnContext(r3 -> {
                cancel();
            });
        }

        protected Optional<PushOffsetRange> handleContinuationToken(Optional<PushOffsetRange> optional, boolean z, boolean z2) {
            if (!optional.isPresent()) {
                return optional;
            }
            PushOffsetRange pushOffsetRange = optional.get();
            PushOffsetVector copy = this.offsetsTracker.getOffsets().copy();
            Preconditions.checkState(pushOffsetRange.getStartOffsets().isPresent());
            PushOffsetRange pushOffsetRange2 = new PushOffsetRange(Optional.of(copy.mergeCopy((OffsetVector) pushOffsetRange.getStartOffsets().get())), copy.mergeCopy(pushOffsetRange.getEndOffsets()));
            if (z2 && z && !((PushOffsetVector) pushOffsetRange.getStartOffsets().get()).lessThanOrEqualTo(copy)) {
                PushRouting.LOG.warn("{}: Found a gap in offsets for {} node {} and id {}: start: {}, current: {}", new Object[]{this.thisHostName, name(), this.node, this.queryId, pushOffsetRange.getStartOffsets(), this.offsetsTracker.getOffsets()});
                this.callback.completeExceptionally(new GapFoundException());
                close();
                return Optional.empty();
            }
            PushRouting.LOG.debug("{}: Before update with {} current offsets {} and {}", new Object[]{this.thisHostName, name(), this.offsetsTracker.getOffsetRange(), new PushOffsetVector()});
            this.offsetsTracker.updateFromToken(pushOffsetRange.getEndOffsets());
            PushRouting.LOG.debug("{}: Updated {} with {} to have current offsets {}", new Object[]{this.thisHostName, name(), pushOffsetRange, this.offsetsTracker.getOffsetRange()});
            return Optional.of(pushOffsetRange2);
        }

        public abstract String name();
    }

    public PushRouting() {
        this(createLoadingCache(), 1000L, true);
    }

    @VisibleForTesting
    public PushRouting(Function<ScalablePushRegistry, Set<PushLocator.KsqlNode>> function, long j, boolean z) {
        this.registryToNodes = function;
        this.clusterCheckInterval = j;
        this.backgroundRetries = z;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }

    public CompletableFuture<PushConnectionsHandle> handlePushQuery(ServiceContext serviceContext, PushPhysicalPlanManager pushPhysicalPlanManager, ConfiguredStatement<Query> configuredStatement, PushRoutingOptions pushRoutingOptions, LogicalSchema logicalSchema, TransientQueryQueue transientQueryQueue, Optional<ScalablePushQueryMetrics> optional, Optional<PushOffsetRange> optional2) {
        Set<PushLocator.KsqlNode> initialHosts = getInitialHosts(pushPhysicalPlanManager, configuredStatement, pushRoutingOptions);
        String str = (String) initialHosts.stream().filter((v0) -> {
            return v0.isLocal();
        }).map(ksqlNode -> {
            return ksqlNode.location().toString();
        }).findFirst().orElse("unknown");
        PushConnectionsHandle pushConnectionsHandle = new PushConnectionsHandle();
        pushConnectionsHandle.onCompletionOrException((r4, th) -> {
            pushPhysicalPlanManager.getScalablePushRegistry().cleanupCatchupConsumer(pushPhysicalPlanManager.getCatchupConsumerGroupId());
        });
        Set<PushLocator.KsqlNode> emptySet = Collections.emptySet();
        if (optional2.isPresent()) {
            pushConnectionsHandle.getOffsetsTracker().updateFromToken(optional2.get().getEndOffsets());
            if (!pushRoutingOptions.getHasBeenForwarded()) {
                emptySet = initialHosts;
            }
        }
        CompletableFuture<PushConnectionsHandle> connectToHosts = connectToHosts(serviceContext, pushPhysicalPlanManager, configuredStatement, initialHosts, logicalSchema, transientQueryQueue, pushConnectionsHandle, false, optional, emptySet, pushRoutingOptions, str);
        if (this.backgroundRetries && !pushRoutingOptions.getHasBeenForwarded()) {
            checkForNewHostsOnContext(serviceContext, pushPhysicalPlanManager, configuredStatement, initialHosts, logicalSchema, transientQueryQueue, pushConnectionsHandle, optional, pushRoutingOptions, str);
        }
        return connectToHosts;
    }

    public void preparePushQuery(PushPhysicalPlanManager pushPhysicalPlanManager, ConfiguredStatement<Query> configuredStatement, PushRoutingOptions pushRoutingOptions) {
        getInitialHosts(pushPhysicalPlanManager, configuredStatement, pushRoutingOptions);
    }

    private Set<PushLocator.KsqlNode> getInitialHosts(PushPhysicalPlanManager pushPhysicalPlanManager, ConfiguredStatement<Query> configuredStatement, PushRoutingOptions pushRoutingOptions) {
        Set<PushLocator.KsqlNode> set = (Set) this.registryToNodes.apply(pushPhysicalPlanManager.getScalablePushRegistry()).stream().filter(ksqlNode -> {
            return !pushRoutingOptions.getHasBeenForwarded() || ksqlNode.isLocal();
        }).collect(Collectors.toSet());
        if (!set.isEmpty()) {
            return set;
        }
        LOG.error("Unable to execute push query: {}. No nodes executing persistent queries", configuredStatement.getMaskedStatementText());
        throw new KsqlException(String.format("Unable to execute push query. No nodes executing persistent queries %s", configuredStatement.getMaskedStatementText()));
    }

    private CompletableFuture<PushConnectionsHandle> connectToHosts(ServiceContext serviceContext, PushPhysicalPlanManager pushPhysicalPlanManager, ConfiguredStatement<Query> configuredStatement, Collection<PushLocator.KsqlNode> collection, LogicalSchema logicalSchema, TransientQueryQueue transientQueryQueue, PushConnectionsHandle pushConnectionsHandle, boolean z, Optional<ScalablePushQueryMetrics> optional, Set<PushLocator.KsqlNode> set, PushRoutingOptions pushRoutingOptions, String str) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (PushLocator.KsqlNode ksqlNode : collection) {
            pushConnectionsHandle.add(ksqlNode, new RoutingResult(RoutingResultStatus.IN_PROGRESS, () -> {
            }));
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.handle((r8, th) -> {
                if (th == null) {
                    pushConnectionsHandle.get(ksqlNode).ifPresent(routingResult -> {
                        routingResult.close();
                        routingResult.updateStatus(RoutingResultStatus.COMPLETE);
                    });
                    LOG.info("Host {} completed request {}.", ksqlNode, pushPhysicalPlanManager.getQueryId());
                    return null;
                }
                if (th instanceof GapFoundException) {
                    pushConnectionsHandle.get(ksqlNode).ifPresent(routingResult2 -> {
                        routingResult2.close();
                        routingResult2.updateStatus(RoutingResultStatus.OFFSET_GAP_FOUND);
                    });
                    return null;
                }
                pushConnectionsHandle.completeExceptionally(th);
                return null;
            });
            linkedHashMap.put(ksqlNode, executeOrRouteQuery(ksqlNode, configuredStatement, serviceContext, pushPhysicalPlanManager, logicalSchema, transientQueryQueue, completableFuture, optional, pushConnectionsHandle.getOffsetsTracker(), set.contains(ksqlNode), pushRoutingOptions, str));
        }
        return CompletableFuture.allOf((CompletableFuture[]) linkedHashMap.values().toArray(new CompletableFuture[0])).thenApply(r7 -> {
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                PushLocator.KsqlNode ksqlNode2 = (PushLocator.KsqlNode) it.next();
                pushConnectionsHandle.add(ksqlNode2, (RoutingResult) ((CompletableFuture) linkedHashMap.get(ksqlNode2)).join());
            }
            return pushConnectionsHandle;
        }).exceptionally((Function<Throwable, ? extends U>) th2 -> {
            PushLocator.KsqlNode ksqlNode2 = (PushLocator.KsqlNode) linkedHashMap.entrySet().stream().filter(entry -> {
                return ((CompletableFuture) entry.getValue()).isCompletedExceptionally();
            }).map((v0) -> {
                return v0.getKey();
            }).findFirst().orElse(null);
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                PushLocator.KsqlNode ksqlNode3 = (PushLocator.KsqlNode) it.next();
                CompletableFuture completableFuture2 = (CompletableFuture) linkedHashMap.get(ksqlNode3);
                if (completableFuture2.isCompletedExceptionally()) {
                    pushConnectionsHandle.get(ksqlNode3).ifPresent(routingResult -> {
                        routingResult.updateStatus(RoutingResultStatus.FAILED);
                    });
                } else {
                    pushConnectionsHandle.add(ksqlNode2, (RoutingResult) completableFuture2.join());
                }
            }
            LOG.warn("Error routing query {} id {} to host {} at timestamp {} with exception {}", new Object[]{configuredStatement.getMaskedStatementText(), pushPhysicalPlanManager.getQueryId(), ksqlNode2, Long.valueOf(System.currentTimeMillis()), th2.getCause()});
            if (!z) {
                pushConnectionsHandle.completeExceptionally(new KsqlException(String.format("Unable to execute push query \"%s\". %s", configuredStatement.getMaskedStatementText(), th2.getCause().getMessage())));
            }
            return pushConnectionsHandle;
        }).exceptionally(th3 -> {
            LOG.error("Unexpected error handing exception", th3);
            return pushConnectionsHandle;
        });
    }

    @VisibleForTesting
    static CompletableFuture<RoutingResult> executeOrRouteQuery(PushLocator.KsqlNode ksqlNode, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext, PushPhysicalPlanManager pushPhysicalPlanManager, LogicalSchema logicalSchema, TransientQueryQueue transientQueryQueue, CompletableFuture<Void> completableFuture, Optional<ScalablePushQueryMetrics> optional, OffsetsTracker offsetsTracker, boolean z, PushRoutingOptions pushRoutingOptions, String str) {
        if (!ksqlNode.isLocal()) {
            LOG.info("Query {} routed to host {} at timestamp {}.", new Object[]{configuredStatement.getMaskedStatementText(), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
            optional.ifPresent(scalablePushQueryMetrics -> {
                scalablePushQueryMetrics.recordRemoteRequests(1.0d);
            });
            AtomicReference atomicReference = new AtomicReference(null);
            return forwardTo(ksqlNode, configuredStatement, serviceContext, logicalSchema, z, offsetsTracker, pushPhysicalPlanManager.getCatchupConsumerGroupId()).thenApply(bufferedPublisher -> {
                atomicReference.set(bufferedPublisher);
                bufferedPublisher.subscribe(new RemoteStreamSubscriber(bufferedPublisher.getContext(), transientQueryQueue, completableFuture, ksqlNode, pushPhysicalPlanManager.getQueryId(), offsetsTracker, pushRoutingOptions, str));
                RoutingResultStatus routingResultStatus = RoutingResultStatus.SUCCESS;
                bufferedPublisher.getClass();
                return new RoutingResult(routingResultStatus, bufferedPublisher::close);
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                LOG.error("Error forwarding query {} to node {}", new Object[]{configuredStatement.getMaskedStatementText(), ksqlNode, th.getCause()});
                BufferedPublisher bufferedPublisher2 = (BufferedPublisher) atomicReference.get();
                if (bufferedPublisher2 != null) {
                    bufferedPublisher2.close();
                }
                throw new KsqlException(String.format("Error forwarding query to node %s: %s", ksqlNode.location(), th.getMessage()), th);
            });
        }
        LOG.info("Query with id {} executed locally at host {} at timestamp {}.", new Object[]{pushPhysicalPlanManager.getQueryId(), ksqlNode.location(), Long.valueOf(System.currentTimeMillis())});
        optional.ifPresent(scalablePushQueryMetrics2 -> {
            scalablePushQueryMetrics2.recordLocalRequests(1.0d);
        });
        AtomicReference atomicReference2 = new AtomicReference();
        AtomicReference atomicReference3 = new AtomicReference(null);
        return CompletableFuture.completedFuture(null).thenApply(obj -> {
            if (pushPhysicalPlanManager.isClosed()) {
                pushPhysicalPlanManager.reset(Optional.of(offsetsTracker.getOffsetRange()));
            }
            atomicReference2.set(pushPhysicalPlanManager.closeable());
            return pushPhysicalPlanManager.execute();
        }).thenApply(bufferedPublisher2 -> {
            atomicReference3.set(bufferedPublisher2);
            bufferedPublisher2.subscribe(new LocalQueryStreamSubscriber(bufferedPublisher2.getContext(), transientQueryQueue, completableFuture, ksqlNode, pushPhysicalPlanManager.getQueryId(), offsetsTracker, pushRoutingOptions, str));
            return new RoutingResult(RoutingResultStatus.SUCCESS, () -> {
                ((Runnable) atomicReference2.get()).run();
                bufferedPublisher2.close();
            });
        }).exceptionally(th2 -> {
            LOG.error("Error executing query {} locally at node {}", new Object[]{configuredStatement.getMaskedStatementText(), ksqlNode.location(), th2.getCause()});
            BufferedPublisher bufferedPublisher3 = (BufferedPublisher) atomicReference3.get();
            ((Runnable) atomicReference2.get()).run();
            if (bufferedPublisher3 != null) {
                bufferedPublisher3.close();
            }
            throw new KsqlException(String.format("Error executing query locally at node %s: %s", ksqlNode.location(), th2.getMessage()), th2);
        });
    }

    private static CompletableFuture<BufferedPublisher<StreamedRow>> forwardTo(PushLocator.KsqlNode ksqlNode, ConfiguredStatement<Query> configuredStatement, ServiceContext serviceContext, LogicalSchema logicalSchema, boolean z, OffsetsTracker offsetsTracker, String str) {
        ImmutableMap.Builder put = ImmutableMap.builder().put("request.ksql.query.push.skip.forwarding", true).put("request.ksql.internal.request", true);
        if (z) {
            put.put("request.ksql.query.push.continuation.token", offsetsTracker.getSerializedOffsetRange());
            put.put("request.ksql.query.push.catchup.consumer.group", str);
        }
        ImmutableMap build = put.build();
        return serviceContext.getKsqlClient().makeQueryRequestStreamed(ksqlNode.location(), configuredStatement.getUnMaskedStatementText(), configuredStatement.getSessionConfig().getOverrides(), build).thenApply(restResponse -> {
            if (restResponse.isErroneous()) {
                throw new KsqlException(String.format("Forwarding pull query request [%s, %s] failed with error %s ", configuredStatement.getSessionConfig().getOverrides(), build, restResponse.getErrorMessage()));
            }
            return (BufferedPublisher) restResponse.getResponse();
        });
    }

    private void checkForNewHostsOnContext(ServiceContext serviceContext, PushPhysicalPlanManager pushPhysicalPlanManager, ConfiguredStatement<Query> configuredStatement, Set<PushLocator.KsqlNode> set, LogicalSchema logicalSchema, TransientQueryQueue transientQueryQueue, PushConnectionsHandle pushConnectionsHandle, Optional<ScalablePushQueryMetrics> optional, PushRoutingOptions pushRoutingOptions, String str) {
        pushPhysicalPlanManager.getContext().runOnContext(r21 -> {
            checkForNewHosts(serviceContext, pushPhysicalPlanManager, configuredStatement, logicalSchema, transientQueryQueue, pushConnectionsHandle, optional, pushRoutingOptions, str);
        });
    }

    private void checkForNewHosts(ServiceContext serviceContext, PushPhysicalPlanManager pushPhysicalPlanManager, ConfiguredStatement<Query> configuredStatement, LogicalSchema logicalSchema, TransientQueryQueue transientQueryQueue, PushConnectionsHandle pushConnectionsHandle, Optional<ScalablePushQueryMetrics> optional, PushRoutingOptions pushRoutingOptions, String str) {
        VertxUtils.checkContext(pushPhysicalPlanManager.getContext());
        if (pushConnectionsHandle.isClosed()) {
            return;
        }
        Set<PushLocator.KsqlNode> apply = this.registryToNodes.apply(pushPhysicalPlanManager.getScalablePushRegistry());
        Set<PushLocator.KsqlNode> activeHosts = pushConnectionsHandle.getActiveHosts();
        Set set = (Set) Sets.difference(apply, activeHosts).stream().filter(ksqlNode -> {
            return ((Boolean) pushConnectionsHandle.get(ksqlNode).map(routingResult -> {
                return Boolean.valueOf(routingResult.getStatus() != RoutingResultStatus.IN_PROGRESS);
            }).orElse(true)).booleanValue();
        }).collect(Collectors.toSet());
        Sets.SetView difference = Sets.difference(activeHosts, apply);
        if (set.size() > 0) {
            LOG.info("Dynamically adding new hosts {} for {}", set, pushPhysicalPlanManager.getQueryId());
            connectToHosts(serviceContext, pushPhysicalPlanManager, configuredStatement, set, logicalSchema, transientQueryQueue, pushConnectionsHandle, true, optional, (Set) set.stream().filter(ksqlNode2 -> {
                return ((Boolean) pushConnectionsHandle.get(ksqlNode2).map(routingResult -> {
                    return Boolean.valueOf(routingResult.getStatus() == RoutingResultStatus.OFFSET_GAP_FOUND);
                }).orElse(false)).booleanValue();
            }).collect(Collectors.toSet()), pushRoutingOptions, str);
        }
        if (difference.size() > 0) {
            LOG.info("Dynamically removing hosts {} for {}", difference, pushPhysicalPlanManager.getQueryId());
            Iterator it = difference.iterator();
            while (it.hasNext()) {
                RoutingResult remove = pushConnectionsHandle.remove((PushLocator.KsqlNode) it.next());
                remove.close();
                remove.updateStatus(RoutingResultStatus.REMOVED);
            }
        }
        pushPhysicalPlanManager.getContext().owner().setTimer(this.clusterCheckInterval, l -> {
            checkForNewHosts(serviceContext, pushPhysicalPlanManager, configuredStatement, logicalSchema, transientQueryQueue, pushConnectionsHandle, optional, pushRoutingOptions, str);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Set<PushLocator.KsqlNode> loadCurrentHosts(ScalablePushRegistry scalablePushRegistry) {
        return new HashSet(scalablePushRegistry.getLocator().locate());
    }

    private static Function<ScalablePushRegistry, Set<PushLocator.KsqlNode>> createLoadingCache() {
        LoadingCache build = CacheBuilder.newBuilder().maximumSize(40L).expireAfterWrite(1000L, TimeUnit.MILLISECONDS).build(new CacheLoader<ScalablePushRegistry, Set<PushLocator.KsqlNode>>() { // from class: io.confluent.ksql.execution.scalablepush.PushRouting.1
            public Set<PushLocator.KsqlNode> load(ScalablePushRegistry scalablePushRegistry) {
                return PushRouting.loadCurrentHosts(scalablePushRegistry);
            }
        });
        build.getClass();
        return (v1) -> {
            return r0.getUnchecked(v1);
        };
    }
}
