package io.confluent.ksql.api.server;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.api.auth.DefaultApiSecurityContext;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.confluent.ksql.rest.entity.InsertsStreamArgs;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.ext.web.RoutingContext;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/api/server/InsertsStreamHandler.class */
public class InsertsStreamHandler implements Handler<RoutingContext> {
    private static final Logger LOG = LogManager.getLogger(InsertsStreamHandler.class);
    private final Context ctx;
    private final Endpoints endpoints;
    private final Server server;
    private final WorkerExecutor workerExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/api/server/InsertsStreamHandler$RequestHandler.class */
    public class RequestHandler {
        private final RoutingContext routingContext;
        private final RecordParser recordParser;
        private final InsertsStreamResponseWriter insertsStreamResponseWriter;
        private final UUID uuid = UUID.randomUUID();
        private boolean hasReadArguments;
        private BufferedPublisher<JsonObject> publisher;
        private long rowsReceived;
        private AcksSubscriber acksSubscriber;
        private boolean paused;
        private boolean responseEnded;
        private long sendSequence;
        private InsertsStreamSubscriber insertsSubscriber;

        RequestHandler(RoutingContext routingContext, RecordParser recordParser) {
            this.routingContext = (RoutingContext) Objects.requireNonNull(routingContext);
            this.recordParser = (RecordParser) Objects.requireNonNull(recordParser);
            String acceptableContentType = routingContext.getAcceptableContentType();
            if ("application/vnd.ksqlapi.delimited.v1".equals(acceptableContentType) || acceptableContentType == null) {
                this.insertsStreamResponseWriter = new DelimitedInsertsStreamResponseWriter(routingContext.response(), this.uuid);
            } else {
                this.insertsStreamResponseWriter = new JsonInsertsStreamResponseWriter(routingContext.response(), this.uuid);
            }
        }

        private void handleBodyBuffer(Buffer buffer) {
            if (this.responseEnded) {
                return;
            }
            if (!this.hasReadArguments) {
                handleArgs(buffer);
            } else if (this.publisher != null) {
                handleRow(buffer);
            }
        }

        private void handleArgs(Buffer buffer) {
            this.hasReadArguments = true;
            Optional deserialiseObject = ServerUtils.deserialiseObject(buffer, this.routingContext, InsertsStreamArgs.class);
            if (deserialiseObject.isPresent()) {
                InsertsStreamHandler.LOG.debug("({}) Processed insert stream args: {}", this.uuid, deserialiseObject.get());
                this.routingContext.response().endHandler(r3 -> {
                    handleResponseEnd();
                });
                this.acksSubscriber = new AcksSubscriber(InsertsStreamHandler.this.ctx, this.routingContext.response(), this.insertsStreamResponseWriter);
                this.recordParser.pause();
                InsertsStreamHandler.this.endpoints.createInsertsSubscriber(((InsertsStreamArgs) deserialiseObject.get()).target, ((InsertsStreamArgs) deserialiseObject.get()).properties, this.acksSubscriber, InsertsStreamHandler.this.ctx, InsertsStreamHandler.this.workerExecutor, DefaultApiSecurityContext.create(this.routingContext, InsertsStreamHandler.this.server)).thenAccept(insertsStreamSubscriber -> {
                    this.publisher = new BufferedPublisher<>(InsertsStreamHandler.this.ctx);
                    InsertsStreamHandler.LOG.debug("({}) Acknowledging insert stream in subscriber after creating publisher.", this.uuid);
                    this.routingContext.response().write("");
                    this.publisher.subscribe(insertsStreamSubscriber);
                    this.recordParser.resume();
                    this.insertsSubscriber = insertsStreamSubscriber;
                }).exceptionally(th -> {
                    return ServerUtils.handleEndpointException(th, this.routingContext, "Failed to execute inserts");
                });
            }
        }

        /*  JADX ERROR: Failed to decode insn: 0x0005: MOVE_MULTI, method: io.confluent.ksql.api.server.InsertsStreamHandler.RequestHandler.handleRow(io.vertx.core.buffer.Buffer):void
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        private void handleRow(io.vertx.core.buffer.Buffer r9) {
            /*
                r8 = this;
                r0 = r8
                r1 = r0
                long r1 = r1.sendSequence
                // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                r2 = 1
                long r1 = r1 + r2
                r0.sendSequence = r1
                r10 = r-1
                io.vertx.core.json.JsonObject r-1 = new io.vertx.core.json.JsonObject
                r0 = r-1
                r1 = r9
                r0.<init>(r1)
                r12 = r-1
                org.apache.logging.log4j.Logger r-1 = io.confluent.ksql.api.server.InsertsStreamHandler.LOG
                java.lang.String r0 = "({}) Handling insert stream row: {}"
                r1 = r8
                java.util.UUID r1 = r1.uuid
                r2 = r12
                r-1.debug(r0, r1, r2)
                goto L6f
                r13 = move-exception
                io.confluent.ksql.rest.entity.InsertError r0 = new io.confluent.ksql.rest.entity.InsertError
                r1 = r0
                r2 = r10
                int r3 = io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_REQUEST
                java.lang.String r4 = "Invalid JSON in inserts stream"
                r1.<init>(r2, r3, r4)
                r14 = r0
                org.apache.logging.log4j.Logger r0 = io.confluent.ksql.api.server.InsertsStreamHandler.LOG
                java.lang.String r1 = "({}) Failed to process row at sequence {} ({})"
                r2 = r8
                java.util.UUID r2 = r2.uuid
                r3 = r8
                long r3 = r3.sendSequence
                java.lang.Long r3 = java.lang.Long.valueOf(r3)
                r4 = r9
                java.lang.String r4 = r4.toString()
                r5 = r13
                r0.warn(r1, r2, r3, r4, r5)
                r0 = r8
                io.confluent.ksql.api.server.InsertsStreamResponseWriter r0 = r0.insertsStreamResponseWriter
                r1 = r14
                io.confluent.ksql.api.server.InsertsStreamResponseWriter r0 = r0.writeError(r1)
                r0.end()
                r0 = r8
                io.confluent.ksql.api.server.AcksSubscriber r0 = r0.acksSubscriber
                r0.cancel()
                return
                r-1 = r8
                io.confluent.ksql.reactive.BufferedPublisher<io.vertx.core.json.JsonObject> r-1 = r-1.publisher
                r0 = r12
                r-1.accept(r0)
                r13 = r-1
                r-1 = r13
                if (r-1 == 0) goto Lb8
                r-1 = r8
                boolean r-1 = r-1.paused
                if (r-1 != 0) goto Lb8
                org.apache.logging.log4j.Logger r-1 = io.confluent.ksql.api.server.InsertsStreamHandler.LOG
                java.lang.String r0 = "({}) Buffer is full after processing {} records. Pausing the parser"
                r1 = r8
                java.util.UUID r1 = r1.uuid
                r2 = r8
                long r2 = r2.sendSequence
                java.lang.Long r2 = java.lang.Long.valueOf(r2)
                r-1.debug(r0, r1, r2)
                r-1 = r8
                io.vertx.core.parsetools.RecordParser r-1 = r-1.recordParser
                r-1.pause()
                r-1 = r8
                io.confluent.ksql.reactive.BufferedPublisher<io.vertx.core.json.JsonObject> r-1 = r-1.publisher
                r0 = r8
                void r0 = r0::publisherReceptive
                r-1.drainHandler(r0)
                r-1 = r8
                r0 = 1
                r-1.paused = r0
                r-1 = r8
                r0 = r-1
                long r0 = r0.rowsReceived
                r1 = 1
                long r0 = r0 + r1
                r-1.rowsReceived = r0
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: io.confluent.ksql.api.server.InsertsStreamHandler.RequestHandler.handleRow(io.vertx.core.buffer.Buffer):void");
        }

        private void publisherReceptive() {
            InsertsStreamHandler.LOG.debug("({}) Resuming record parser after draining publisher.", this.uuid);
            this.paused = false;
            this.recordParser.resume();
        }

        private void handleBodyEnd(Void r7) {
            InsertsStreamHandler.LOG.debug("({}) Completed reading the request, ending the response. Completing Publisher: {}, Closing Publisher: {}", this.uuid, Boolean.valueOf(this.publisher != null), Boolean.valueOf(this.acksSubscriber == null));
            if (this.publisher != null) {
                this.publisher.complete();
                if (this.acksSubscriber == null) {
                    this.routingContext.response().end();
                } else {
                    this.acksSubscriber.insertsSent(this.rowsReceived);
                }
            }
        }

        private void handleResponseEnd() {
            this.responseEnded = true;
            if (this.insertsSubscriber != null) {
                this.insertsSubscriber.close();
            }
        }
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public InsertsStreamHandler(Context context, Endpoints endpoints, Server server) {
        this.ctx = (Context) Objects.requireNonNull(context);
        this.endpoints = (Endpoints) Objects.requireNonNull(endpoints);
        this.server = (Server) Objects.requireNonNull(server);
        this.workerExecutor = (WorkerExecutor) Objects.requireNonNull(server.getWorkerExecutor());
    }

    public void handle(RoutingContext routingContext) {
        if (ServerUtils.checkHttp2(routingContext)) {
            RecordParser newDelimited = RecordParser.newDelimited("\n", routingContext.request());
            RequestHandler requestHandler = new RequestHandler(routingContext, newDelimited);
            Objects.requireNonNull(requestHandler);
            newDelimited.handler(requestHandler::handleBodyBuffer);
            Objects.requireNonNull(requestHandler);
            newDelimited.endHandler(requestHandler::handleBodyEnd);
        }
    }
}
