package io.confluent.ksql.api.server;

import io.vertx.core.Context;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/BufferedPublisher.class */
public class BufferedPublisher<T> extends BasePublisher<T> {
    private static final Logger log = LoggerFactory.getLogger(BufferedPublisher.class);
    public static final int SEND_MAX_BATCH_SIZE = 10;
    public static final int DEFAULT_BUFFER_MAX_SIZE = 100;
    private final Queue<T> buffer;
    private final int bufferMaxSize;
    private Runnable drainHandler;
    private boolean complete;
    private boolean completing;

    public BufferedPublisher(Context context) {
        this(context, Collections.emptySet(), 100);
    }

    public BufferedPublisher(Context context, Collection<T> collection) {
        this(context, collection, 100);
    }

    public BufferedPublisher(Context context, int i) {
        this(context, Collections.emptySet(), i);
    }

    public BufferedPublisher(Context context, Collection<T> collection, int i) {
        super(context);
        this.buffer = new ArrayDeque();
        this.buffer.addAll(collection);
        this.bufferMaxSize = i;
    }

    public boolean accept(T t) {
        checkContext();
        if (this.completing) {
            throw new IllegalStateException("Cannot call accept after complete is called");
        }
        if (!isCancelled()) {
            if (getDemand() == 0) {
                this.buffer.add(t);
            } else {
                doOnNext(t);
            }
        }
        return this.buffer.size() >= this.bufferMaxSize;
    }

    public void drainHandler(Runnable runnable) {
        checkContext();
        if (this.drainHandler != null) {
            throw new IllegalStateException("drainHandler already set");
        }
        this.drainHandler = (Runnable) Objects.requireNonNull(runnable);
    }

    public void complete() {
        checkContext();
        if (isCancelled() || this.completing) {
            return;
        }
        this.completing = true;
        if (!this.buffer.isEmpty() || getSubscriber() == null) {
            this.complete = true;
        } else {
            sendComplete();
        }
    }

    @Override // io.confluent.ksql.api.server.BasePublisher
    protected void maybeSend() {
        int i = 0;
        while (true) {
            if (!isCancelled() && getDemand() > 0 && !this.buffer.isEmpty()) {
                if (i >= 10) {
                    this.ctx.runOnContext(r3 -> {
                        maybeSend();
                    });
                    break;
                } else {
                    doOnNext(this.buffer.poll());
                    i++;
                }
            } else {
                break;
            }
        }
        if (!this.buffer.isEmpty() || isCancelled()) {
            return;
        }
        if (this.complete) {
            sendComplete();
        } else {
            if (getDemand() <= 0 || this.drainHandler == null) {
                return;
            }
            Runnable runnable = this.drainHandler;
            this.ctx.runOnContext(r32 -> {
                runnable.run();
            });
            this.drainHandler = null;
        }
    }
}
