package io.confluent.ksql.rest.client;

import io.confluent.ksql.reactive.BufferedPublisher;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.parsetools.RecordParser;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/* loaded from: input_file:io/confluent/ksql/rest/client/StreamPublisher.class */
public class StreamPublisher<T> extends BufferedPublisher<T> {
    private final HttpClientResponse response;
    private boolean drainHandlerSet;

    public static Buffer toJsonMsg(Buffer buffer) {
        int i = 0;
        int length = buffer.length() - 1;
        if (buffer.getByte(0) == 91) {
            i = 1;
        }
        if (buffer.getByte(length) == 93) {
            length--;
        }
        if (buffer.getByte(length) == 44) {
            length--;
        }
        return buffer.slice(i, length + 1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamPublisher(Context context, HttpClientResponse httpClientResponse, Function<Buffer, T> function, CompletableFuture<ResponseWithBody> completableFuture) {
        super(context);
        this.response = httpClientResponse;
        RecordParser newDelimited = RecordParser.newDelimited("\n", httpClientResponse);
        completableFuture.getClass();
        newDelimited.exceptionHandler(completableFuture::completeExceptionally).handler(buffer -> {
            if (buffer.length() == 0 || accept(function.apply(toJsonMsg(buffer))) || this.drainHandlerSet) {
                return;
            }
            newDelimited.pause();
            this.drainHandlerSet = true;
            drainHandler(() -> {
                this.drainHandlerSet = false;
                newDelimited.resume();
            });
        }).endHandler(r3 -> {
            complete();
        });
    }

    public void close() {
        this.response.request().connection().close();
    }
}
