/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging;

import io.smallrye.reactive.converters.ReactiveTypeConverter;
import io.smallrye.reactive.converters.Registry;
import io.smallrye.reactive.messaging.AbstractMediator;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;

public class StreamTransformerMediator
extends AbstractMediator {
    Function<Publisher<Message>, Publisher<Message>> function;
    private Publisher<Message> publisher;

    public StreamTransformerMediator(MediatorConfiguration configuration) {
        super(configuration);
    }

    @Override
    public void connectToUpstream(Publisher<? extends Message> publisher) {
        Objects.requireNonNull(this.function);
        this.publisher = this.decorate(this.function.apply(publisher));
    }

    @Override
    public Publisher<Message> getComputedPublisher() {
        Objects.requireNonNull(this.publisher);
        return this.publisher;
    }

    @Override
    public boolean isConnected() {
        return this.publisher != null;
    }

    @Override
    public void initialize(Object bean) {
        super.initialize(bean);
        switch (this.configuration.consumption()) {
            case STREAM_OF_MESSAGE: {
                if (this.configuration.usesBuilderTypes()) {
                    this.processMethodConsumingAPublisherBuilderOfMessages();
                    break;
                }
                this.processMethodConsumingAPublisherOfMessages();
                break;
            }
            case STREAM_OF_PAYLOAD: {
                if (this.configuration.usesBuilderTypes()) {
                    this.processMethodConsumingAPublisherBuilderOfPayload();
                    break;
                }
                this.processMethodConsumingAPublisherOfPayload();
                break;
            }
            default: {
                throw new IllegalArgumentException("Unexpected consumption type: " + (Object)((Object)this.configuration.consumption()));
            }
        }
        assert (this.function != null);
    }

    private void processMethodConsumingAPublisherBuilderOfMessages() {
        this.function = publisher -> {
            PublisherBuilder prependedWithAck = ReactiveStreams.fromPublisher((Publisher)publisher).flatMapCompletionStage(this.managePreProcessingAck());
            PublisherBuilder builder = (PublisherBuilder)this.invoke(prependedWithAck);
            Objects.requireNonNull(builder, "The method " + this.configuration.methodAsString() + " has returned an invalid value: `null`");
            return builder.buildRs();
        };
    }

    private void processMethodConsumingAPublisherOfMessages() {
        this.function = publisher -> {
            Publisher prependedWithAck = ReactiveStreams.fromPublisher((Publisher)publisher).flatMapCompletionStage(this.managePreProcessingAck()).buildRs();
            Class<?> parameterType = this.configuration.getMethod().getParameterTypes()[0];
            Optional converter = Registry.lookup(parameterType);
            if (converter.isPresent()) {
                prependedWithAck = (Publisher)((ReactiveTypeConverter)converter.get()).fromPublisher(prependedWithAck);
            }
            Publisher result = (Publisher)this.invoke(prependedWithAck);
            Objects.requireNonNull(result, "The method " + this.configuration.methodAsString() + " has returned an invalid value: `null`");
            return result;
        };
    }

    private void processMethodConsumingAPublisherBuilderOfPayload() {
        this.function = publisher -> {
            PublisherBuilder stream = ReactiveStreams.fromPublisher((Publisher)publisher).flatMapCompletionStage(this.managePreProcessingAck()).map(Message::getPayload);
            PublisherBuilder builder = (PublisherBuilder)this.invoke(stream);
            Objects.requireNonNull(builder, "The method " + this.configuration.methodAsString() + " has returned an invalid value: `null`");
            return builder.map(o -> Message.of(o)).buildRs();
        };
    }

    private void processMethodConsumingAPublisherOfPayload() {
        this.function = publisher -> {
            Publisher stream = ReactiveStreams.fromPublisher((Publisher)publisher).flatMapCompletionStage(this.managePreProcessingAck()).map(Message::getPayload).buildRs();
            Class<?> parameterType = this.configuration.getMethod().getParameterTypes()[0];
            Optional converter = Registry.lookup(parameterType);
            if (converter.isPresent()) {
                stream = (Publisher)((ReactiveTypeConverter)converter.get()).fromPublisher(stream);
            }
            Publisher result = (Publisher)this.invoke(stream);
            Objects.requireNonNull(result, "The method " + this.configuration.methodAsString() + " has returned an invalid value: `null`");
            return ReactiveStreams.fromPublisher((Publisher)result).map(o -> Message.of(o)).buildRs();
        };
    }
}

