package io.confluent.ksql.physical.scalablepush;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.physical.common.operators.AbstractPhysicalOperator;
import io.confluent.ksql.physical.pull.PullPhysicalPlan;
import io.confluent.ksql.physical.scalablepush.operators.PushDataSourceOperator;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.class */
public class PushPhysicalPlan {
    private static final int CAPACITY = Integer.MAX_VALUE;
    private static final Logger LOGGER = LoggerFactory.getLogger(PullPhysicalPlan.class);
    private final AbstractPhysicalOperator root;
    private final LogicalSchema schema;
    private final QueryId queryId;
    private final ScalablePushRegistry scalablePushRegistry;
    private final PushDataSourceOperator dataSourceOperator;
    private final Context context;
    private volatile boolean closed = false;
    private long timer = -1;

    /* loaded from: input_file:io/confluent/ksql/physical/scalablepush/PushPhysicalPlan$Publisher.class */
    public static class Publisher extends BufferedPublisher<List<?>> {
        public Publisher(Context context) {
            super(context, PushPhysicalPlan.CAPACITY);
        }

        public void reportDroppedRows() {
            sendError(new RuntimeException("Dropped rows"));
        }
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP"})
    public PushPhysicalPlan(AbstractPhysicalOperator abstractPhysicalOperator, LogicalSchema logicalSchema, QueryId queryId, ScalablePushRegistry scalablePushRegistry, PushDataSourceOperator pushDataSourceOperator, Context context) {
        this.root = (AbstractPhysicalOperator) Objects.requireNonNull(abstractPhysicalOperator, "root");
        this.schema = (LogicalSchema) Objects.requireNonNull(logicalSchema, "schema");
        this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId");
        this.scalablePushRegistry = (ScalablePushRegistry) Objects.requireNonNull(scalablePushRegistry, "scalablePushRegistry");
        this.dataSourceOperator = pushDataSourceOperator;
        this.context = context;
    }

    public BufferedPublisher<List<?>> execute() {
        Publisher publisher = new Publisher(this.context);
        this.context.runOnContext(r5 -> {
            open(publisher);
        });
        return publisher;
    }

    public boolean isClosed() {
        return this.closed;
    }

    private void maybeNext(Publisher publisher) {
        while (true) {
            List list = (List) next();
            if (list == null) {
                break;
            }
            if (this.dataSourceOperator.droppedRows()) {
                closeInternal();
                publisher.reportDroppedRows();
                break;
            }
            publisher.accept(list);
        }
        if (this.closed) {
            publisher.close();
            return;
        }
        if (this.timer >= 0) {
            this.context.owner().cancelTimer(this.timer);
        }
        this.timer = this.context.owner().setTimer(100L, l -> {
            this.context.runOnContext(r5 -> {
                maybeNext(publisher);
            });
        });
    }

    private void open(Publisher publisher) {
        VertxUtils.checkContext(this.context);
        this.dataSourceOperator.setNewRowCallback(() -> {
            this.context.runOnContext(r5 -> {
                maybeNext(publisher);
            });
        });
        this.root.open();
        maybeNext(publisher);
    }

    private Object next() {
        VertxUtils.checkContext(this.context);
        return this.root.next();
    }

    public void close() {
        this.context.runOnContext(r3 -> {
            closeInternal();
        });
    }

    private void closeInternal() {
        VertxUtils.checkContext(this.context);
        this.closed = true;
        this.root.close();
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP"})
    public AbstractPhysicalOperator getRoot() {
        return this.root;
    }

    public QueryId getQueryId() {
        return this.queryId;
    }

    public LogicalSchema getOutputSchema() {
        return this.schema;
    }

    public ScalablePushRegistry getScalablePushRegistry() {
        return this.scalablePushRegistry;
    }
}
