package io.confluent.ksql.api.server;

import io.confluent.ksql.api.impl.Utils;
import io.confluent.ksql.api.spi.Endpoints;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/ServerVerticle.class */
public class ServerVerticle extends AbstractVerticle {
    private static final Logger log = LoggerFactory.getLogger(ServerVerticle.class);
    private final Endpoints endpoints;
    private final HttpServerOptions httpServerOptions;
    private final Server server;
    private final WorkerExecutor workerExecutor;
    private ConnectionQueryManager connectionQueryManager;
    private HttpServer httpServer;

    public ServerVerticle(Endpoints endpoints, HttpServerOptions httpServerOptions, Server server, WorkerExecutor workerExecutor) {
        this.endpoints = endpoints;
        this.httpServerOptions = httpServerOptions;
        this.server = server;
        this.workerExecutor = workerExecutor;
    }

    public void start(Promise<Void> promise) {
        this.connectionQueryManager = new ConnectionQueryManager(this.context, this.server);
        this.httpServer = this.vertx.createHttpServer(this.httpServerOptions).requestHandler(setupRouter()).exceptionHandler(ServerUtils::unhandledExceptonHandler);
        Future future = Promise.promise().future();
        Utils.connectPromise(future.map(httpServer -> {
            return null;
        }), promise);
        this.httpServer.listen(future);
        this.vertx.getOrCreateContext().exceptionHandler(ServerUtils::unhandledExceptonHandler);
    }

    public void stop(Promise<Void> promise) {
        if (this.httpServer == null) {
            promise.complete();
        } else {
            this.httpServer.close(promise.future());
        }
    }

    private Router setupRouter() {
        Router router = Router.router(this.vertx);
        router.route(HttpMethod.POST, "/query-stream").produces("application/vnd.ksqlapi.delimited.v1").produces("application/json").handler(BodyHandler.create()).handler(new QueryStreamHandler(this.endpoints, this.connectionQueryManager, this.context, this.workerExecutor));
        router.route(HttpMethod.POST, "/inserts-stream").produces("application/vnd.ksqlapi.delimited.v1").produces("application/json").handler(new InsertsStreamHandler(this.context, this.endpoints));
        router.route(HttpMethod.POST, "/close-query").handler(BodyHandler.create()).handler(new CloseQueryHandler(this.server));
        return router;
    }
}
