package io.confluent.ksql.api.server;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.api.auth.DefaultApiSecurityContext;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.HeartbeatMessage;
import io.confluent.ksql.rest.entity.KsqlMediaType;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.netty.handler.codec.haproxy.HAProxyProtocolException;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/api/server/ServerVerticle.class */
public class ServerVerticle extends AbstractVerticle {
    private static final Logger log = LogManager.getLogger(ServerVerticle.class);
    private static final String JSON_CONTENT_TYPE = "application/json";
    private static final String DELIMITED_CONTENT_TYPE = "application/vnd.ksqlapi.delimited.v1";
    private final Endpoints endpoints;
    private final HttpServerOptions httpServerOptions;
    private final Server server;
    private ConnectionQueryManager connectionQueryManager;
    private HttpServer httpServer;
    private final Optional<Boolean> isInternalListener;
    private final LoggingRateLimiter loggingRateLimiter;

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public ServerVerticle(Endpoints endpoints, HttpServerOptions httpServerOptions, Server server, Optional<Boolean> optional, LoggingRateLimiter loggingRateLimiter) {
        this.endpoints = (Endpoints) Objects.requireNonNull(endpoints);
        this.httpServerOptions = (HttpServerOptions) Objects.requireNonNull(httpServerOptions);
        this.server = (Server) Objects.requireNonNull(server);
        this.isInternalListener = (Optional) Objects.requireNonNull(optional);
        this.loggingRateLimiter = (LoggingRateLimiter) Objects.requireNonNull(loggingRateLimiter);
    }

    public void start(Promise<Void> promise) {
        this.connectionQueryManager = new ConnectionQueryManager(this.context, this.server);
        this.httpServer = this.vertx.createHttpServer(this.httpServerOptions).requestHandler(setupRouter()).exceptionHandler(ServerVerticle::unhandledExceptionHandler);
        this.httpServer.listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                promise.complete();
            } else {
                promise.fail(asyncResult.cause());
            }
        });
    }

    public void stop(Promise<Void> promise) {
        if (this.httpServer == null) {
            promise.complete();
        } else {
            this.httpServer.close(asyncResult -> {
                promise.complete();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int actualPort() {
        return this.httpServer.actualPort();
    }

    private Router setupRouter() {
        Router router = Router.router(this.vertx);
        router.route().handler(new LoggingHandler(this.server, this.loggingRateLimiter));
        if (this.server.getConfig().getBoolean(KsqlRestConfig.KSQL_SERVER_SNI_CHECK_ENABLE).booleanValue()) {
            router.route().handler(new SniHandler());
        }
        KsqlCorsHandler.setupCorsHandler(this.server.getConfig(), router);
        router.route(HttpMethod.GET, "/chc/ready").handler(ServerVerticle::chcHandler);
        router.route(HttpMethod.GET, "/chc/live").handler(ServerVerticle::chcHandler);
        router.route().failureHandler(new FailureHandler());
        this.isInternalListener.ifPresent(bool -> {
            router.route().handler(new InternalEndpointHandler(bool.booleanValue()));
        });
        AuthHandlers.setupAuthHandlers(this.server, router, this.isInternalListener.orElse(false).booleanValue());
        router.route().handler(new ServerStateHandler(this.server.getServerState()));
        router.route(HttpMethod.POST, "/query-stream").produces(DELIMITED_CONTENT_TYPE).produces(JSON_CONTENT_TYPE).produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).handler(BodyHandler.create(false)).handler(new QueryStreamHandler(this.endpoints, this.connectionQueryManager, this.context, this.server, false));
        router.route(HttpMethod.POST, "/query-stream").produces(KsqlMediaType.KSQL_V1_PROTOBUF.mediaType()).handler(BodyHandler.create(false)).handler(new QueryStreamHandler(this.endpoints, this.connectionQueryManager, this.context, this.server, false));
        router.route(HttpMethod.POST, "/inserts-stream").produces(DELIMITED_CONTENT_TYPE).produces(JSON_CONTENT_TYPE).handler(new InsertsStreamHandler(this.context, this.endpoints, this.server));
        router.route(HttpMethod.POST, "/close-query").handler(BodyHandler.create(false)).handler(new CloseQueryHandler(this.server));
        router.route(HttpMethod.GET, "/").handler(ServerVerticle::handleInfoRedirect);
        router.route(HttpMethod.POST, "/ksql").handler(BodyHandler.create(false)).produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleKsqlRequest);
        router.route(HttpMethod.POST, "/ksql/terminate").handler(BodyHandler.create(false)).produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleTerminateRequest);
        router.route(HttpMethod.POST, "/query").handler(BodyHandler.create(false)).produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleQueryRequest);
        router.route(HttpMethod.POST, "/query").handler(BodyHandler.create(false)).produces(DELIMITED_CONTENT_TYPE).produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(new QueryStreamHandler(this.endpoints, this.connectionQueryManager, this.context, this.server, true));
        router.route(HttpMethod.POST, "/query").handler(BodyHandler.create(false)).produces(KsqlMediaType.KSQL_V1_PROTOBUF.mediaType()).handler(new QueryStreamHandler(this.endpoints, this.connectionQueryManager, this.context, this.server, true));
        router.route(HttpMethod.GET, "/info").produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleInfoRequest);
        router.route(HttpMethod.POST, "/heartbeat").handler(BodyHandler.create(false)).produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleHeartbeatRequest);
        router.route(HttpMethod.GET, "/clusterStatus").produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleClusterStatusRequest);
        router.route(HttpMethod.GET, "/status/:type/:entity/:action").produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleStatusRequest);
        router.route(HttpMethod.GET, "/status").produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleAllStatusesRequest);
        router.route(HttpMethod.POST, "/lag").handler(BodyHandler.create(false)).produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleLagReportRequest);
        router.route(HttpMethod.GET, "/healthcheck").produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleHealthcheckRequest);
        router.route(HttpMethod.GET, "/v1/metadata").produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleServerMetadataRequest);
        router.route(HttpMethod.GET, "/v1/metadata/id").produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleServerMetadataClusterIdRequest);
        router.route(HttpMethod.GET, "/ws/query").produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleWebsocket);
        router.route(HttpMethod.GET, "/is_valid_property/:property").produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).produces(JSON_CONTENT_TYPE).handler(this::handleIsValidPropertyRequest);
        router.route(HttpMethod.POST, "/test").handler(BodyHandler.create(false)).produces(KsqlMediaType.KSQL_V1_JSON.mediaType()).handler(this::handleTest);
        return router;
    }

    private void handleKsqlRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, KsqlRequest.class, Optional.empty(), (ksqlRequest, apiSecurityContext) -> {
            return this.endpoints.executeKsqlRequest(ksqlRequest, this.server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleTerminateRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, ClusterTerminateRequest.class, Optional.empty(), (clusterTerminateRequest, apiSecurityContext) -> {
            return this.endpoints.executeTerminate(clusterTerminateRequest, this.server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleQueryRequest(RoutingContext routingContext) {
        CompletableFuture completableFuture = new CompletableFuture();
        routingContext.request().connection().closeHandler(r4 -> {
            completableFuture.complete(null);
        });
        MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder();
        OldApiUtils.handleOldApiRequest(this.server, routingContext, KsqlRequest.class, Optional.of(metricsCallbackHolder), (ksqlRequest, apiSecurityContext) -> {
            return this.endpoints.executeQueryRequest(ksqlRequest, this.server.getWorkerExecutor(), completableFuture, DefaultApiSecurityContext.create(routingContext, this.server), isInternalRequest(routingContext), getContentType(routingContext), metricsCallbackHolder, this.context);
        });
    }

    private void handleInfoRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, null, Optional.empty(), (obj, apiSecurityContext) -> {
            return this.endpoints.executeInfo(DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleClusterStatusRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, null, Optional.empty(), (obj, apiSecurityContext) -> {
            return this.endpoints.executeClusterStatus(DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleHeartbeatRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, HeartbeatMessage.class, Optional.empty(), (heartbeatMessage, apiSecurityContext) -> {
            return this.endpoints.executeHeartbeat(heartbeatMessage, DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleStatusRequest(RoutingContext routingContext) {
        HttpServerRequest request = routingContext.request();
        String param = request.getParam("type");
        String param2 = request.getParam("entity");
        String param3 = request.getParam("action");
        OldApiUtils.handleOldApiRequest(this.server, routingContext, null, Optional.empty(), (obj, apiSecurityContext) -> {
            return this.endpoints.executeStatus(param, param2, param3, DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleIsValidPropertyRequest(RoutingContext routingContext) {
        String param = routingContext.request().getParam("property");
        OldApiUtils.handleOldApiRequest(this.server, routingContext, null, Optional.empty(), (obj, apiSecurityContext) -> {
            return this.endpoints.executeIsValidProperty(param, this.server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleAllStatusesRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, null, Optional.empty(), (obj, apiSecurityContext) -> {
            return this.endpoints.executeAllStatuses(DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleLagReportRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, LagReportingMessage.class, Optional.empty(), (lagReportingMessage, apiSecurityContext) -> {
            return this.endpoints.executeLagReport(lagReportingMessage, DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleHealthcheckRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, null, Optional.empty(), (obj, apiSecurityContext) -> {
            return this.endpoints.executeCheckHealth(DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleServerMetadataRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, null, Optional.empty(), (obj, apiSecurityContext) -> {
            return this.endpoints.executeServerMetadata(DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private void handleServerMetadataClusterIdRequest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, null, Optional.empty(), (obj, apiSecurityContext) -> {
            return this.endpoints.executeServerMetadataClusterId(DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private static void handleInfoRedirect(RoutingContext routingContext) {
        routingContext.response().putHeader("location", "/info").setStatusCode(HttpResponseStatus.TEMPORARY_REDIRECT.code()).end();
    }

    private void handleWebsocket(RoutingContext routingContext) {
        DefaultApiSecurityContext create = DefaultApiSecurityContext.create(routingContext, this.server);
        routingContext.request().toWebSocket(asyncResult -> {
            if (asyncResult.failed()) {
                routingContext.fail(asyncResult.cause());
            } else {
                this.endpoints.executeWebsocketStream((ServerWebSocket) asyncResult.result(), routingContext.request().params(), this.server.getWorkerExecutor(), create, this.context);
            }
        });
    }

    private void handleTest(RoutingContext routingContext) {
        OldApiUtils.handleOldApiRequest(this.server, routingContext, String.class, Optional.empty(), (str, apiSecurityContext) -> {
            return this.endpoints.executeTest(str, DefaultApiSecurityContext.create(routingContext, this.server));
        });
    }

    private static void chcHandler(RoutingContext routingContext) {
        routingContext.response().putHeader(HttpHeaders.CONTENT_TYPE.toString(), JSON_CONTENT_TYPE).end(new JsonObject().toBuffer());
    }

    private static KsqlMediaType getContentType(RoutingContext routingContext) {
        String acceptableContentType = routingContext.getAcceptableContentType();
        return (acceptableContentType == null || JSON_CONTENT_TYPE.equals(acceptableContentType)) ? KsqlMediaType.LATEST_FORMAT : KsqlMediaType.parse(acceptableContentType);
    }

    private static void unhandledExceptionHandler(Throwable th) {
        if (th instanceof ClosedChannelException) {
            log.debug("Unhandled ClosedChannelException (connection likely closed early)", th);
        } else if (th instanceof HAProxyProtocolException) {
            log.error("Failed to decode proxy protocol header", th);
        } else {
            log.error("Unhandled exception", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Optional<Boolean> isInternalRequest(RoutingContext routingContext) {
        return Optional.ofNullable((Boolean) routingContext.get(InternalEndpointHandler.CONTEXT_DATA_IS_INTERNAL));
    }
}
