/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.AcksPublisher;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.impl.AcksPublisherImpl;
import io.confluent.ksql.api.client.impl.InsertAckImpl;
import io.confluent.ksql.api.client.impl.ResponseHandler;
import io.confluent.ksql.api.client.impl.StreamInsertsSubscriber;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.parsetools.RecordParser;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class StreamInsertsResponseHandler
extends ResponseHandler<CompletableFuture<AcksPublisher>> {
    private static final Logger log = LoggerFactory.getLogger(StreamInsertsResponseHandler.class);
    private final AcksPublisherImpl acksPublisher;
    private boolean paused;

    StreamInsertsResponseHandler(Context context, RecordParser recordParser, CompletableFuture<AcksPublisher> cf, HttpClientRequest request, Publisher<KsqlObject> insertsPublisher) {
        super(context, recordParser, cf);
        Objects.requireNonNull(request);
        insertsPublisher.subscribe((Subscriber)new StreamInsertsSubscriber(context, request));
        this.acksPublisher = new AcksPublisherImpl(context);
        cf.complete(this.acksPublisher);
    }

    @Override
    protected void doHandleBodyBuffer(Buffer buff) {
        JsonObject jsonObject = new JsonObject(buff);
        long seqNum = jsonObject.getLong("seq");
        String status = jsonObject.getString("status");
        if ("ok".equals(status)) {
            InsertAckImpl ack = new InsertAckImpl(seqNum);
            boolean full = this.acksPublisher.accept(ack);
            if (full && !this.paused) {
                this.recordParser.pause();
                this.acksPublisher.drainHandler(this::publisherReceptive);
                this.paused = true;
            }
        } else if ("error".equals(status)) {
            this.acksPublisher.handleError(new KsqlClientException(String.format("Received error from /inserts-stream. Inserts sequence number: %d. Error code: %d. Message: %s", seqNum, jsonObject.getInteger("error_code"), jsonObject.getString("message"))));
        } else {
            throw new IllegalStateException("Unrecognized status response from /inserts-stream: " + status);
        }
    }

    @Override
    protected void doHandleException(Throwable t) {
        log.error((Object)t);
        this.acksPublisher.handleError(new Exception(t));
    }

    @Override
    protected void doHandleBodyEnd() {
        this.acksPublisher.complete();
    }

    private void publisherReceptive() {
        this.checkContext();
        this.paused = false;
        this.recordParser.resume();
    }
}

