package io.confluent.ksql.rest.server.execution;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.server.ServerUtil;
import io.confluent.ksql.rest.util.DiscoverRemoteHostsUtil;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.Pair;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.state.HostInfo;

/* loaded from: input_file:io/confluent/ksql/rest/server/execution/RemoteHostExecutor.class */
public final class RemoteHostExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteHostExecutor.class);
    private final ConfiguredStatement<?> statement;
    private final SessionProperties sessionProperties;
    private final KsqlExecutionContext executionContext;
    private final SimpleKsqlClient ksqlClient;

    private RemoteHostExecutor(ConfiguredStatement<?> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, SimpleKsqlClient simpleKsqlClient) {
        this.statement = (ConfiguredStatement) Objects.requireNonNull(configuredStatement);
        this.sessionProperties = (SessionProperties) Objects.requireNonNull(sessionProperties);
        this.executionContext = (KsqlExecutionContext) Objects.requireNonNull(ksqlExecutionContext);
        this.ksqlClient = (SimpleKsqlClient) Objects.requireNonNull(simpleKsqlClient);
    }

    public static RemoteHostExecutor create(ConfiguredStatement<?> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, SimpleKsqlClient simpleKsqlClient) {
        return new RemoteHostExecutor(configuredStatement, sessionProperties, ksqlExecutionContext, simpleKsqlClient);
    }

    private RestResponse<KsqlEntityList> makeKsqlRequest(HostInfo hostInfo, String str) {
        return this.ksqlClient.makeKsqlRequest(ServerUtil.buildRemoteUri(this.sessionProperties.getLocalUrl(), hostInfo.host(), hostInfo.port()), str, Collections.singletonMap("request.ksql.internal.request", true));
    }

    private CompletableFuture<RestResponse<KsqlEntityList>> fetchRemoteData(String str, HostInfo hostInfo, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            return makeKsqlRequest(hostInfo, str);
        }, executor);
    }

    public Pair<Map<HostInfo, KsqlEntity>, Set<HostInfo>> fetchAllRemoteResults() {
        Set<HostInfo> remoteHosts = DiscoverRemoteHostsUtil.getRemoteHosts(this.executionContext.getPersistentQueries(), this.sessionProperties.getKsqlHostInfo());
        if (remoteHosts.isEmpty() || this.sessionProperties.getInternalRequest()) {
            return new Pair<>(ImmutableMap.of(), ImmutableSet.of());
        }
        HashSet hashSet = new HashSet();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(remoteHosts.size());
        try {
            HashMap hashMap = new HashMap();
            for (HostInfo hostInfo : remoteHosts) {
                hashMap.put(hostInfo, fetchRemoteData(this.statement.getUnMaskedStatementText(), hostInfo, newFixedThreadPool));
            }
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (Map.Entry entry : hashMap.entrySet()) {
                try {
                    RestResponse restResponse = (RestResponse) ((CompletableFuture) entry.getValue()).get(this.executionContext.getKsqlConfig().getLong("ksql.fetch.remote.hosts.max.timeout.seconds").longValue(), TimeUnit.SECONDS);
                    if (restResponse.isErroneous()) {
                        LOG.warn("Error response from host. host: {}, cause: {}", new Object[]{entry.getKey(), restResponse.getErrorMessage().getMessage()});
                        hashSet.add((HostInfo) entry.getKey());
                    } else {
                        builder.put((HostInfo) entry.getKey(), (KsqlEntity) ((KsqlEntityList) restResponse.getResponse()).get(0));
                    }
                } catch (Exception e) {
                    LOG.warn("Failed to retrieve info from host: {}, statement: {}, cause: {}", new Object[]{entry.getKey(), this.statement.getMaskedStatementText(), e});
                    hashSet.add((HostInfo) entry.getKey());
                }
            }
            Pair<Map<HostInfo, KsqlEntity>, Set<HostInfo>> pair = new Pair<>(builder.build(), hashSet);
            newFixedThreadPool.shutdown();
            return pair;
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            throw th;
        }
    }
}
