package io.confluent.ksql.api.server;

import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.auth.DefaultApiSecurityContext;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.HttpVersion;
import io.vertx.ext.web.RoutingContext;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:io/confluent/ksql/api/server/OldApiUtils.class */
public final class OldApiUtils {
    private static final String CONTENT_TYPE_HEADER = HttpHeaders.CONTENT_TYPE.toString();
    private static final String JSON_CONTENT_TYPE = "application/json";
    private static final String CHUNKED_ENCODING = "chunked";

    private OldApiUtils() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public static <T> void handleOldApiRequest(Server server, RoutingContext routingContext, Class<T> cls, Optional<PullQueryExecutorMetrics> optional, BiFunction<T, ApiSecurityContext, CompletableFuture<EndpointResponse>> biFunction) {
        T t;
        long nanoseconds = Time.SYSTEM.nanoseconds();
        if (cls != null) {
            Optional deserialiseObject = ServerUtils.deserialiseObject(routingContext.getBody(), routingContext, cls);
            if (!deserialiseObject.isPresent()) {
                return;
            } else {
                t = deserialiseObject.get();
            }
        } else {
            t = null;
        }
        optional.ifPresent(pullQueryExecutorMetrics -> {
            pullQueryExecutorMetrics.recordRequestSize(routingContext.request().bytesRead());
        });
        biFunction.apply(t, DefaultApiSecurityContext.create(routingContext)).thenAccept(endpointResponse -> {
            handleOldApiResponse(server, routingContext, endpointResponse, optional, nanoseconds);
        }).exceptionally(th -> {
            if (th instanceof CompletionException) {
                th = th.getCause();
            }
            handleOldApiResponse(server, routingContext, mapException(th), optional, nanoseconds);
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void handleOldApiResponse(Server server, RoutingContext routingContext, EndpointResponse endpointResponse, Optional<PullQueryExecutorMetrics> optional, long j) {
        HttpServerResponse response = routingContext.response();
        response.putHeader(CONTENT_TYPE_HEADER, JSON_CONTENT_TYPE);
        response.setStatusCode(endpointResponse.getStatus());
        if (endpointResponse.getEntity() instanceof StreamingOutput) {
            StreamingOutput streamingOutput = (StreamingOutput) endpointResponse.getEntity();
            if (routingContext.request().version() == HttpVersion.HTTP_2) {
                routingContext.response().setStatusCode(HttpResponseStatus.METHOD_NOT_ALLOWED.code()).setStatusMessage("The /query endpoint is not available using HTTP2").end();
                streamingOutput.close();
                return;
            } else {
                response.putHeader("Transfer-Encoding", CHUNKED_ENCODING);
                streamEndpointResponse(server, routingContext, streamingOutput);
            }
        } else if (endpointResponse.getEntity() == null) {
            response.end();
        } else {
            response.end(endpointResponse.getEntity() instanceof String ? Buffer.buffer((String) endpointResponse.getEntity()) : ServerUtils.serializeObject(endpointResponse.getEntity()));
        }
        optional.ifPresent(pullQueryExecutorMetrics -> {
            pullQueryExecutorMetrics.recordResponseSize(routingContext.response().bytesWritten());
        });
        optional.ifPresent(pullQueryExecutorMetrics2 -> {
            pullQueryExecutorMetrics2.recordLatency(j);
        });
    }

    private static void streamEndpointResponse(Server server, RoutingContext routingContext, StreamingOutput streamingOutput) {
        server.getWorkerExecutor().executeBlocking(promise -> {
            ResponseOutputStream responseOutputStream = new ResponseOutputStream(routingContext.response());
            routingContext.request().connection().closeHandler(r5 -> {
                try {
                    responseOutputStream.close();
                } catch (IOException e) {
                    promise.fail(e);
                }
            });
            try {
                try {
                    streamingOutput.write(new BufferedOutputStream(responseOutputStream));
                    promise.complete();
                } catch (Exception e) {
                    promise.fail(e);
                    try {
                        responseOutputStream.close();
                    } catch (IOException e2) {
                    }
                }
            } finally {
                try {
                    responseOutputStream.close();
                } catch (IOException e3) {
                }
            }
        }, new VertxCompletableFuture());
    }

    public static EndpointResponse mapException(Throwable th) {
        return th instanceof KsqlRestException ? ((KsqlRestException) th).getResponse() : EndpointResponse.create().status(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).type(JSON_CONTENT_TYPE).entity(new KsqlErrorMessage(Errors.ERROR_CODE_SERVER_ERROR, th)).build();
    }
}
