package io.confluent.ksql.api.server;

import io.confluent.ksql.api.server.protocol.ErrorResponse;
import io.confluent.ksql.api.server.protocol.InsertsStreamArgs;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.InsertsSubscriber;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.ext.web.RoutingContext;
import java.util.Objects;
import java.util.Optional;

/* loaded from: input_file:io/confluent/ksql/api/server/InsertsStreamHandler.class */
public class InsertsStreamHandler implements Handler<RoutingContext> {
    private final Context ctx;
    private final Endpoints endpoints;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/api/server/InsertsStreamHandler$RequestHandler.class */
    public class RequestHandler {
        private final RoutingContext routingContext;
        private final RecordParser recordParser;
        private final InsertsStreamResponseWriter insertsStreamResponseWriter;
        private boolean hasReadArguments;
        private BufferedPublisher<JsonObject> publisher;
        private long rowsReceived;
        private AcksSubscriber acksSubscriber;

        RequestHandler(RoutingContext routingContext, RecordParser recordParser) {
            this.routingContext = (RoutingContext) Objects.requireNonNull(routingContext);
            this.recordParser = (RecordParser) Objects.requireNonNull(recordParser);
            String acceptableContentType = routingContext.getAcceptableContentType();
            if ("application/vnd.ksqlapi.delimited.v1".equals(acceptableContentType) || acceptableContentType == null) {
                this.insertsStreamResponseWriter = new DelimitedInsertsStreamResponseWriter(routingContext.response());
            } else {
                this.insertsStreamResponseWriter = new JsonInsertsStreamResponseWriter(routingContext.response());
            }
        }

        public void handleBodyBuffer(Buffer buffer) {
            if (!this.hasReadArguments) {
                handleArgs(buffer);
            } else if (this.publisher != null) {
                handleRow(buffer);
            }
        }

        private void handleArgs(Buffer buffer) {
            this.hasReadArguments = true;
            Optional deserialiseObject = ServerUtils.deserialiseObject(buffer, this.routingContext.response(), InsertsStreamArgs.class);
            if (deserialiseObject.isPresent()) {
                this.acksSubscriber = ((InsertsStreamArgs) deserialiseObject.get()).requiresAcks ? new AcksSubscriber(InsertsStreamHandler.this.ctx, this.routingContext.response(), this.insertsStreamResponseWriter) : null;
                InsertsSubscriber createInsertsSubscriber = InsertsStreamHandler.this.endpoints.createInsertsSubscriber(((InsertsStreamArgs) deserialiseObject.get()).target, ((InsertsStreamArgs) deserialiseObject.get()).properties, this.acksSubscriber);
                this.publisher = new BufferedPublisher<>(InsertsStreamHandler.this.ctx);
                this.routingContext.response().write("");
                this.publisher.subscribe(createInsertsSubscriber);
            }
        }

        private void handleRow(Buffer buffer) {
            try {
                if (this.publisher.accept(new JsonObject(buffer))) {
                    this.recordParser.pause();
                    BufferedPublisher<JsonObject> bufferedPublisher = this.publisher;
                    RecordParser recordParser = this.recordParser;
                    recordParser.getClass();
                    bufferedPublisher.drainHandler(recordParser::resume);
                }
                this.rowsReceived++;
            } catch (DecodeException e) {
                this.insertsStreamResponseWriter.writeError(new ErrorResponse(4, "Invalid JSON in inserts stream")).end();
                this.acksSubscriber.cancel();
            }
        }

        public void handleBodyEnd(Void r5) {
            if (this.publisher != null) {
                this.publisher.complete();
                if (this.acksSubscriber == null) {
                    this.routingContext.response().end();
                } else {
                    this.acksSubscriber.insertsSent(this.rowsReceived);
                }
            }
        }
    }

    public InsertsStreamHandler(Context context, Endpoints endpoints) {
        this.ctx = (Context) Objects.requireNonNull(context);
        this.endpoints = (Endpoints) Objects.requireNonNull(endpoints);
    }

    public void handle(RoutingContext routingContext) {
        RecordParser newDelimited = RecordParser.newDelimited("\n", routingContext.request());
        RequestHandler requestHandler = new RequestHandler(routingContext, newDelimited);
        requestHandler.getClass();
        newDelimited.handler(requestHandler::handleBodyBuffer);
        requestHandler.getClass();
        newDelimited.endHandler(requestHandler::handleBodyEnd);
    }
}
