package io.confluent.ksql.rest.server;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.impl.InsertsStreamEndpoint;
import io.confluent.ksql.api.impl.KsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.QueryEndpoint;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.reactive.BasePublisher;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.HeartbeatMessage;
import io.confluent.ksql.rest.entity.KsqlMediaType;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.server.query.QueryExecutor;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.HeartbeatResource;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.resources.LagReportingResource;
import io.confluent.ksql.rest.server.resources.ServerInfoResource;
import io.confluent.ksql.rest.server.resources.ServerMetadataResource;
import io.confluent.ksql.rest.server.resources.StatusResource;
import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource;
import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint;
import io.confluent.ksql.rest.util.AuthenticationUtil;
import io.confluent.ksql.security.KsqlAuthTokenProvider;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ReservedInternalTopics;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonObject;
import java.time.Clock;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:io/confluent/ksql/rest/server/KsqlServerEndpoints.class */
public class KsqlServerEndpoints implements Endpoints {
    private final KsqlEngine ksqlEngine;
    private final KsqlConfig ksqlConfig;
    private final ReservedInternalTopics reservedInternalTopics;
    private final KsqlSecurityContextProvider ksqlSecurityContextProvider;
    private final KsqlResource ksqlResource;
    private final StreamedQueryResource streamedQueryResource;
    private final ServerInfoResource serverInfoResource;
    private final Optional<HeartbeatResource> heartbeatResource;
    private final Optional<ClusterStatusResource> clusterStatusResource;
    private final StatusResource statusResource;
    private final Optional<LagReportingResource> lagReportingResource;
    private final HealthCheckResource healthCheckResource;
    private final ServerMetadataResource serverMetadataResource;
    private final WSQueryEndpoint wsQueryEndpoint;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
    private final QueryExecutor queryExecutor;
    private final Optional<KsqlAuthTokenProvider> authTokenProvider;

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public KsqlServerEndpoints(KsqlEngine ksqlEngine, KsqlConfig ksqlConfig, KsqlSecurityContextProvider ksqlSecurityContextProvider, KsqlResource ksqlResource, StreamedQueryResource streamedQueryResource, ServerInfoResource serverInfoResource, Optional<HeartbeatResource> optional, Optional<ClusterStatusResource> optional2, StatusResource statusResource, Optional<LagReportingResource> optional3, HealthCheckResource healthCheckResource, ServerMetadataResource serverMetadataResource, WSQueryEndpoint wSQueryEndpoint, Optional<PullQueryExecutorMetrics> optional4, QueryExecutor queryExecutor, Optional<KsqlAuthTokenProvider> optional5) {
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine);
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig);
        this.reservedInternalTopics = new ReservedInternalTopics(ksqlConfig);
        this.ksqlSecurityContextProvider = (KsqlSecurityContextProvider) Objects.requireNonNull(ksqlSecurityContextProvider);
        this.ksqlResource = (KsqlResource) Objects.requireNonNull(ksqlResource);
        this.streamedQueryResource = (StreamedQueryResource) Objects.requireNonNull(streamedQueryResource);
        this.serverInfoResource = (ServerInfoResource) Objects.requireNonNull(serverInfoResource);
        this.heartbeatResource = (Optional) Objects.requireNonNull(optional);
        this.clusterStatusResource = (Optional) Objects.requireNonNull(optional2);
        this.statusResource = (StatusResource) Objects.requireNonNull(statusResource);
        this.lagReportingResource = (Optional) Objects.requireNonNull(optional3);
        this.healthCheckResource = (HealthCheckResource) Objects.requireNonNull(healthCheckResource);
        this.serverMetadataResource = (ServerMetadataResource) Objects.requireNonNull(serverMetadataResource);
        this.wsQueryEndpoint = (WSQueryEndpoint) Objects.requireNonNull(wSQueryEndpoint);
        this.pullQueryMetrics = (Optional) Objects.requireNonNull(optional4);
        this.queryExecutor = queryExecutor;
        this.authTokenProvider = optional5;
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<Publisher<?>> createQueryPublisher(String str, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3, Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext, MetricsCallbackHolder metricsCallbackHolder, Optional<Boolean> optional) {
        KsqlSecurityContext provide = this.ksqlSecurityContextProvider.provide(apiSecurityContext);
        return executeOnWorker(() -> {
            try {
                BasePublisher<?> createQueryPublisher = new QueryEndpoint(this.ksqlEngine, this.ksqlConfig, this.pullQueryMetrics, this.queryExecutor).createQueryPublisher(str, map, map2, map3, context, workerExecutor, provide.getServiceContext(), metricsCallbackHolder, optional);
                provide.getServiceContext().close();
                return createQueryPublisher;
            } catch (Throwable th) {
                provide.getServiceContext().close();
                throw th;
            }
        }, workerExecutor);
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<InsertsStreamSubscriber> createInsertsSubscriber(String str, JsonObject jsonObject, Subscriber<InsertResult> subscriber, Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
        return executeOnWorker(() -> {
            return new InsertsStreamEndpoint(this.ksqlEngine, this.ksqlConfig, this.reservedInternalTopics).createInsertsSubscriber(str, jsonObject, subscriber, context, workerExecutor, this.ksqlSecurityContextProvider.provide(apiSecurityContext).getServiceContext());
        }, workerExecutor);
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeKsqlRequest(KsqlRequest ksqlRequest, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> {
            return this.ksqlResource.handleKsqlStatements(ksqlSecurityContext, ksqlRequest);
        }, workerExecutor);
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeQueryRequest(KsqlRequest ksqlRequest, WorkerExecutor workerExecutor, CompletableFuture<Void> completableFuture, ApiSecurityContext apiSecurityContext, Optional<Boolean> optional, KsqlMediaType ksqlMediaType, MetricsCallbackHolder metricsCallbackHolder, Context context) {
        return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> {
            return this.streamedQueryResource.streamQuery(ksqlSecurityContext, ksqlRequest, completableFuture, optional, metricsCallbackHolder, context);
        }, workerExecutor);
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeTerminate(ClusterTerminateRequest clusterTerminateRequest, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> {
            return this.ksqlResource.terminateCluster(ksqlSecurityContext, clusterTerminateRequest);
        }, workerExecutor);
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeInfo(ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
            return this.serverInfoResource.get();
        });
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeHeartbeat(HeartbeatMessage heartbeatMessage, ApiSecurityContext apiSecurityContext) {
        return (CompletableFuture) this.heartbeatResource.map(heartbeatResource -> {
            return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
                return heartbeatResource.registerHeartbeat(heartbeatMessage);
            });
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(EndpointResponse.failed(HttpResponseStatus.NOT_FOUND.code()));
        });
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeClusterStatus(ApiSecurityContext apiSecurityContext) {
        return (CompletableFuture) this.clusterStatusResource.map(clusterStatusResource -> {
            return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
                return clusterStatusResource.checkClusterStatus();
            });
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(EndpointResponse.failed(HttpResponseStatus.NOT_FOUND.code()));
        });
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeStatus(String str, String str2, String str3, ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
            return this.statusResource.getStatus(str, str2, str3);
        });
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeIsValidProperty(String str, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> {
            return this.ksqlResource.isValidProperty(str);
        }, workerExecutor);
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeAllStatuses(ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
            return this.statusResource.getAllStatuses();
        });
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeLagReport(LagReportingMessage lagReportingMessage, ApiSecurityContext apiSecurityContext) {
        return (CompletableFuture) this.lagReportingResource.map(lagReportingResource -> {
            return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
                return lagReportingResource.receiveHostLag(lagReportingMessage);
            });
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(EndpointResponse.failed(HttpResponseStatus.NOT_FOUND.code()));
        });
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeCheckHealth(ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
            return this.healthCheckResource.checkHealth();
        });
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeServerMetadata(ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
            return this.serverMetadataResource.getServerMetadata();
        });
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
            return this.serverMetadataResource.getServerClusterId();
        });
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public void executeWebsocketStream(ServerWebSocket serverWebSocket, MultiMap multiMap, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext, Context context) {
        executeOnWorker(() -> {
            KsqlSecurityContext provide = this.ksqlSecurityContextProvider.provide(apiSecurityContext);
            try {
                this.wsQueryEndpoint.executeStreamQuery(serverWebSocket, multiMap, provide, context, new AuthenticationUtil(Clock.systemUTC()).getTokenTimeout(apiSecurityContext.getAuthHeader(), this.ksqlConfig, this.authTokenProvider));
                provide.getServiceContext().close();
                return null;
            } catch (Throwable th) {
                provide.getServiceContext().close();
                throw th;
            }
        }, workerExecutor);
    }

    @Override // io.confluent.ksql.api.spi.Endpoints
    public CompletableFuture<EndpointResponse> executeTest(String str, ApiSecurityContext apiSecurityContext) {
        return executeOldApiEndpoint(apiSecurityContext, ksqlSecurityContext -> {
            return this.ksqlResource.runTest(str);
        });
    }

    private <R> CompletableFuture<R> executeOnWorker(Supplier<R> supplier, WorkerExecutor workerExecutor) {
        VertxCompletableFuture vertxCompletableFuture = new VertxCompletableFuture();
        workerExecutor.executeBlocking(promise -> {
            promise.complete(supplier.get());
        }, false, vertxCompletableFuture);
        return vertxCompletableFuture;
    }

    private CompletableFuture<EndpointResponse> executeOldApiEndpointOnWorker(ApiSecurityContext apiSecurityContext, Function<KsqlSecurityContext, EndpointResponse> function, WorkerExecutor workerExecutor) {
        KsqlSecurityContext provide = this.ksqlSecurityContextProvider.provide(apiSecurityContext);
        return executeOnWorker(() -> {
            try {
                return (EndpointResponse) function.apply(provide);
            } finally {
                provide.getServiceContext().close();
            }
        }, workerExecutor);
    }

    private CompletableFuture<EndpointResponse> executeOldApiEndpoint(ApiSecurityContext apiSecurityContext, Function<KsqlSecurityContext, EndpointResponse> function) {
        KsqlSecurityContext provide = this.ksqlSecurityContextProvider.provide(apiSecurityContext);
        try {
            CompletableFuture<EndpointResponse> completedFuture = CompletableFuture.completedFuture(function.apply(provide));
            provide.getServiceContext().close();
            return completedFuture;
        } catch (Throwable th) {
            provide.getServiceContext().close();
            throw th;
        }
    }
}
