package io.smallrye.reactive.messaging.providers.helpers;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/providers/helpers/MultiUtils.class */
public class MultiUtils {

    /* loaded from: input_file:io/smallrye/reactive/messaging/providers/helpers/MultiUtils$NoopProcessor.class */
    public static class NoopProcessor<T> extends AbstractMulti<T> implements Flow.Processor<T, T>, Flow.Subscription {
        private volatile boolean done = false;
        private volatile boolean cancelled = false;
        private volatile Flow.Subscription upstream = null;
        private volatile Flow.Subscriber<? super T> downstream = null;
        private static final AtomicReferenceFieldUpdater<NoopProcessor, Flow.Subscription> UPSTREAM_UPDATER = AtomicReferenceFieldUpdater.newUpdater(NoopProcessor.class, Flow.Subscription.class, "upstream");
        private static final AtomicReferenceFieldUpdater<NoopProcessor, Flow.Subscriber> DOWNSTREAM_UPDATER = AtomicReferenceFieldUpdater.newUpdater(NoopProcessor.class, Flow.Subscriber.class, "downstream");

        public static <I> NoopProcessor<I> create() {
            return new NoopProcessor<>();
        }

        private NoopProcessor() {
        }

        public void subscribe(MultiSubscriber<? super T> multiSubscriber) {
            ParameterValidation.nonNull(multiSubscriber, "downstream");
            if (!DOWNSTREAM_UPDATER.compareAndSet(this, null, multiSubscriber)) {
                Subscriptions.fail(multiSubscriber, new IllegalStateException("Already subscribed"));
            } else if (this.upstream != null) {
                multiSubscriber.onSubscribe(this);
            }
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            if (isDoneOrCancelled() || !UPSTREAM_UPDATER.compareAndSet(this, null, subscription)) {
                subscription.cancel();
                return;
            }
            Flow.Subscriber<? super T> subscriber = this.downstream;
            if (subscriber != null) {
                subscriber.onSubscribe(this);
            }
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(T t) {
            Flow.Subscriber<? super T> subscriber;
            if (isDoneOrCancelled() || (subscriber = this.downstream) == null) {
                return;
            }
            subscriber.onNext(t);
        }

        private boolean isDoneOrCancelled() {
            return this.done || this.cancelled;
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            Objects.requireNonNull(th);
            if (isDoneOrCancelled()) {
                return;
            }
            this.done = true;
            Flow.Subscriber<? super T> subscriber = this.downstream;
            if (subscriber != null) {
                subscriber.onError(th);
            }
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            if (isDoneOrCancelled()) {
                return;
            }
            this.done = true;
            Flow.Subscriber<? super T> subscriber = this.downstream;
            if (subscriber != null) {
                subscriber.onComplete();
            }
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            if (j > 0) {
                UPSTREAM_UPDATER.get(this).request(j);
            }
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            DOWNSTREAM_UPDATER.getAndSet(this, null);
        }
    }

    public static <T> Multi<T> createFromGenerator(Supplier<T> supplier) {
        return Multi.createFrom().generator(() -> {
            return null;
        }, (obj, generatorEmitter) -> {
            generatorEmitter.emit(supplier.get());
            return obj;
        });
    }

    public static <T> Multi<T> publisher(Flow.Publisher<T> publisher) {
        Multi<T> multi = (Flow.Publisher) ParameterValidation.nonNull(publisher, "publisher");
        return multi instanceof Multi ? multi : Multi.createFrom().safePublisher(publisher);
    }

    public static Multi<? extends Message<?>> handlePreProcessingAcknowledgement(Multi<? extends Message<?>> multi, MediatorConfiguration mediatorConfiguration) {
        return mediatorConfiguration.getAcknowledgment() != Acknowledgment.Strategy.PRE_PROCESSING ? multi : multi.plug(multi2 -> {
            return multi2.onItem().transformToUni(message -> {
                return Uni.createFrom().completionStage(message.ack()).map(r3 -> {
                    return message;
                });
            }).concatenate(true);
        });
    }

    public static <T, R> Multi<R> via(Multi<T> multi, Flow.Processor<? super T, ? super R> processor) {
        return multi.plug(multi2 -> {
            return Multi.createFrom().deferred(() -> {
                Multi publisher = publisher(processor);
                multi2.subscribe(processor);
                return publisher;
            });
        });
    }

    public static <T, R, P> Flow.Subscriber<T> via(final Flow.Processor<T, R> processor, final Function<Multi<R>, Multi<P>> function) {
        return new MultiSubscriber<T>() { // from class: io.smallrye.reactive.messaging.providers.helpers.MultiUtils.1
            public void onSubscribe(Flow.Subscription subscription) {
                processor.onSubscribe(subscription);
                MultiUtils.publisher(processor).plug(function).subscribe().with(obj -> {
                });
            }

            public void onItem(T t) {
                processor.onNext(t);
            }

            public void onFailure(Throwable th) {
                processor.onError(th);
            }

            public void onCompletion() {
                processor.onComplete();
            }
        };
    }

    public static <T, R> Flow.Subscriber<T> via(Function<Multi<T>, Multi<R>> function) {
        return via(NoopProcessor.create(), function);
    }
}
