/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.grpc.stubs;

import io.grpc.stub.StreamObserver;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

public class ManyToManyObserver<I, O>
extends AbstractMulti<O>
implements StreamObserver<O> {
    private final StreamObserver<I> processor;
    private final Multi<I> source;
    private final UpstreamSubscriber subscriber = new UpstreamSubscriber();
    private final AtomicReference<Flow.Subscription> upstream = new AtomicReference();
    private volatile MultiSubscriber<? super O> downstream;

    public ManyToManyObserver(Multi<I> source, Function<StreamObserver<O>, StreamObserver<I>> function) {
        this.processor = function.apply(this);
        this.source = source;
    }

    public void subscribe(MultiSubscriber<? super O> subscriber) {
        this.downstream = subscriber;
        this.source.subscribe((Flow.Subscriber)this.subscriber);
    }

    public void onNext(O value) {
        this.downstream.onItem(value);
    }

    public void onError(Throwable t) {
        this.cancelUpstream();
        this.downstream.onFailure(t);
    }

    public void onCompleted() {
        this.cancelUpstream();
        this.downstream.onComplete();
    }

    private void cancelUpstream() {
        Flow.Subscription subscription = this.upstream.getAndSet((Flow.Subscription)Subscriptions.CANCELLED);
        if (subscription != null) {
            subscription.cancel();
        }
    }

    class UpstreamSubscriber
    implements Flow.Subscriber<I>,
    Flow.Subscription {
        UpstreamSubscriber() {
        }

        @Override
        public void onSubscribe(Flow.Subscription items) {
            if (!ManyToManyObserver.this.upstream.compareAndSet(null, items)) {
                items.cancel();
            } else {
                ManyToManyObserver.this.downstream.onSubscribe((Flow.Subscription)this);
                items.request(Long.MAX_VALUE);
            }
        }

        @Override
        public void onNext(I item) {
            ManyToManyObserver.this.processor.onNext(item);
        }

        @Override
        public void onError(Throwable t) {
            ManyToManyObserver.this.processor.onError(t);
        }

        @Override
        public void onComplete() {
            ManyToManyObserver.this.processor.onCompleted();
        }

        @Override
        public void request(long n) {
        }

        @Override
        public void cancel() {
            ManyToManyObserver.this.cancelUpstream();
        }
    }
}

