package io.helidon.common.reactive;

import io.helidon.common.reactive.Flow;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/helidon/common/reactive/SubmissionPublisher.class */
public class SubmissionPublisher<T> implements Flow.Publisher<T>, AutoCloseable {
    private final Flux<T> flux;
    private final FluxSink<T> sink;
    private final AtomicInteger numberOfSubscribers;

    /* loaded from: input_file:io/helidon/common/reactive/SubmissionPublisher$OnCancelSubscriber.class */
    private static class OnCancelSubscriber<T> implements Subscriber<T> {
        private final Subscriber<T> delegate;
        private final Consumer<Subscription> onCancel;

        OnCancelSubscriber(Flow.Subscriber<T> subscriber, Consumer<Subscription> consumer) {
            this.delegate = ReactiveStreamsAdapter.subscriberFromFlow(subscriber);
            this.onCancel = consumer;
        }

        public void onSubscribe(final Subscription subscription) {
            this.delegate.onSubscribe(new Subscription() { // from class: io.helidon.common.reactive.SubmissionPublisher.OnCancelSubscriber.1
                public void request(long j) {
                    subscription.request(j);
                }

                public void cancel() {
                    subscription.cancel();
                    OnCancelSubscriber.this.onCancel.accept(subscription);
                }
            });
        }

        public void onNext(T t) {
            this.delegate.onNext(t);
        }

        public void onError(Throwable th) {
            this.delegate.onError(th);
        }

        public void onComplete() {
            this.delegate.onComplete();
        }
    }

    public SubmissionPublisher(Executor executor, int i) {
        this(Schedulers.fromExecutor(executor), i);
    }

    public SubmissionPublisher(int i) {
        this(Schedulers.immediate(), i);
    }

    public SubmissionPublisher() {
        this(Schedulers.immediate(), Flow.defaultBufferSize());
    }

    private SubmissionPublisher(Scheduler scheduler, int i) {
        if (scheduler == null) {
            throw new NullPointerException();
        }
        if (i <= 0) {
            throw new IllegalArgumentException("capacity must be positive");
        }
        UnicastProcessor create = UnicastProcessor.create();
        this.sink = create.sink();
        this.flux = create.publish(i).autoConnect().subscribeOn(Schedulers.immediate()).publishOn(scheduler);
        this.numberOfSubscribers = new AtomicInteger(0);
    }

    @Override // io.helidon.common.reactive.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException();
        }
        this.flux.subscribe(new OnCancelSubscriber(subscriber, this::onCancel));
        this.numberOfSubscribers.incrementAndGet();
    }

    private void onCancel(Subscription subscription) {
        this.numberOfSubscribers.decrementAndGet();
    }

    public void submit(T t) {
        if (t == null) {
            throw new NullPointerException();
        }
        this.sink.next(t);
    }

    public void offer(T t, BiPredicate<Flow.Subscriber<? super T>, ? super T> biPredicate) {
        submit(t);
    }

    public void closeExceptionally(Throwable th) {
        if (th == null) {
            throw new NullPointerException();
        }
        this.sink.error(th);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.sink.complete();
    }

    public int getNumberOfSubscribers() {
        return this.numberOfSubscribers.get();
    }

    public boolean hasSubscribers() {
        return getNumberOfSubscribers() > 0;
    }
}
