package io.confluent.ksql.rest.server.resources.streaming;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.confluent.ksql.rest.server.resources.streaming.Flow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PollingSubscription.class */
public abstract class PollingSubscription<T> implements Flow.Subscription {
    private static final int BACKOFF_DELAY_MS = 100;
    private final Flow.Subscriber<T> subscriber;
    private final ListeningScheduledExecutorService exec;
    private final LogicalSchema schema;
    private boolean needsSchema = true;
    private volatile boolean done = false;
    private Throwable exception = null;
    private boolean draining = false;
    private volatile ListenableFuture<?> future;

    public PollingSubscription(ListeningScheduledExecutorService listeningScheduledExecutorService, Flow.Subscriber<T> subscriber, LogicalSchema logicalSchema) {
        this.exec = (ListeningScheduledExecutorService) Objects.requireNonNull(listeningScheduledExecutorService, "exec");
        this.subscriber = (Flow.Subscriber) Objects.requireNonNull(subscriber, "subscriber");
        this.schema = logicalSchema;
    }

    @Override // io.confluent.ksql.rest.server.resources.streaming.Flow.Subscription
    public void cancel() {
        if (this.future != null) {
            this.future.cancel(false);
        }
        this.exec.submit(this::close);
    }

    @Override // io.confluent.ksql.rest.server.resources.streaming.Flow.Subscription
    public void request(long j) {
        Preconditions.checkArgument(j == 1, "number of requested items must be 1");
        if (this.needsSchema) {
            if (this.schema != null) {
                this.subscriber.onSchema(this.schema);
            }
            this.needsSchema = false;
        }
        if (this.draining) {
            return;
        }
        this.future = this.exec.submit(() -> {
            if (this.done) {
                this.draining = true;
            }
            T poll = poll();
            if (poll != null) {
                this.subscriber.onNext(poll);
            } else if (!this.draining) {
                this.future = this.exec.schedule(() -> {
                    request(1L);
                }, 100L, TimeUnit.MILLISECONDS);
            }
            if (this.draining) {
                close();
                if (this.exception != null) {
                    this.subscriber.onError(this.exception);
                } else {
                    this.subscriber.onComplete();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setError(Throwable th) {
        this.exception = th;
        this.done = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setDone() {
        this.done = true;
    }

    abstract T poll();

    abstract void close();
}
