/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging;

import io.smallrye.reactive.messaging.AbstractMediator;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.SubscriberWrapper;
import io.smallrye.reactive.messaging.WeavingException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang3.ClassUtils;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriberMediator
extends AbstractMediator {
    private Publisher<Message> source;
    private Subscriber subscriber;
    private AtomicReference<Subscription> subscription = new AtomicReference();

    public SubscriberMediator(MediatorConfiguration configuration) {
        super(configuration);
        if (configuration.shape() != Shape.SUBSCRIBER) {
            throw new IllegalArgumentException("Expected a Subscriber shape, received a " + (Object)((Object)configuration.shape()));
        }
    }

    @Override
    public void initialize(Object bean) {
        super.initialize(bean);
        switch (this.configuration.consumption()) {
            case STREAM_OF_MESSAGE: 
            case STREAM_OF_PAYLOAD: {
                this.processMethodReturningASubscriber();
                break;
            }
            case MESSAGE: 
            case PAYLOAD: {
                if (ClassUtils.isAssignable(this.configuration.getMethod().getReturnType(), CompletionStage.class)) {
                    this.processMethodReturningACompletionStage();
                    break;
                }
                this.processMethodReturningVoid();
                break;
            }
            default: {
                throw new IllegalArgumentException("Unexpected consumption type: " + (Object)((Object)this.configuration.consumption()));
            }
        }
        assert (this.subscriber != null);
    }

    @Override
    public Subscriber<Message> getComputedSubscriber() {
        return this.subscriber;
    }

    @Override
    public boolean isConnected() {
        return this.source != null;
    }

    @Override
    public void connectToUpstream(Publisher<? extends Message> publisher) {
        this.source = publisher;
    }

    @Override
    public void run() {
        assert (this.source != null);
        assert (this.subscriber != null);
        final Logger logger = LoggerFactory.getLogger((String)this.configuration.methodAsString());
        final AtomicReference syncErrorCatcher = new AtomicReference();
        Subscriber delegating = new Subscriber(){

            public void onSubscribe(Subscription s) {
                SubscriberMediator.this.subscription.set(s);
                SubscriberMediator.this.subscriber.onSubscribe(s);
            }

            public void onNext(Object o) {
                SubscriberMediator.this.subscriber.onNext(o);
            }

            public void onError(Throwable t) {
                logger.error("Error caught during the stream processing", t);
                syncErrorCatcher.set(t);
                SubscriberMediator.this.subscriber.onError(t);
            }

            public void onComplete() {
                SubscriberMediator.this.subscriber.onComplete();
            }
        };
        this.source.subscribe(delegating);
        Throwable throwable = (Throwable)syncErrorCatcher.get();
        if (throwable != null) {
            throw new WeavingException(this.configuration.getIncoming(), throwable);
        }
    }

    private void processMethodReturningVoid() {
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD) {
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).map(message -> {
                this.invoke(message.getPayload());
                return message;
            }).flatMapCompletionStage(this.managePostProcessingAck()).ignore().build();
        }
    }

    private void processMethodReturningACompletionStage() {
        this.subscriber = this.configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD ? ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).flatMapCompletionStage(message -> {
            CompletionStage stage = (CompletionStage)this.invoke(message.getPayload());
            return stage.thenApply(x -> message);
        }).flatMapCompletionStage(this.managePostProcessingAck()).ignore().build() : ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).flatMapCompletionStage(message -> {
            CompletionStage completion = (CompletionStage)this.invoke(message);
            return completion.thenApply(x -> message);
        }).flatMapCompletionStage(this.managePostProcessingAck()).ignore().build();
    }

    private void processMethodReturningASubscriber() {
        Object result = this.invoke(new Object[0]);
        if (!(result instanceof Subscriber)) {
            throw new IllegalStateException("Invalid return type: " + result + " - expected a Subscriber");
        }
        Subscriber sub = (Subscriber)result;
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.STREAM_OF_PAYLOAD) {
            SubscriberWrapper<Object, Object> wrapper = new SubscriberWrapper<Object, Object>(sub, x -> ((Message)x).getPayload());
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).via(wrapper).flatMapCompletionStage(this.managePostProcessingAck()).ignore().build();
        } else {
            Subscriber casted = sub;
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(this.managePreProcessingAck()).via(new SubscriberWrapper(casted, Function.identity())).flatMapCompletionStage(this.managePostProcessingAck()).ignore().build();
        }
    }
}

