/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.extension;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.MultiEmitter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.reactivestreams.Publisher;

public class EmitterImpl<T>
implements Emitter<T> {
    private final AtomicReference<MultiEmitter<? super Message<? extends T>>> internal = new AtomicReference();
    private final Multi<Message<? extends T>> publisher;
    private final String name;
    private AtomicReference<Throwable> synchronousFailure = new AtomicReference();

    EmitterImpl(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) {
        this.name = name;
        if (defaultBufferSize <= 0L) {
            throw new IllegalArgumentException("The default buffer size must be strictly positive");
        }
        Consumer<MultiEmitter<? super Message<? extends T>>> deferred = fe -> {
            if (!this.internal.compareAndSet((MultiEmitter<Message<T>>)null, (MultiEmitter<Message<T>>)fe)) {
                fe.fail((Throwable)new Exception("Emitter already created"));
            }
        };
        if (overFlowStrategy == null) {
            Multi multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
            this.publisher = this.getPublisherUsingBufferStrategy(defaultBufferSize, multi);
        } else {
            this.publisher = this.getPublisherForStrategy(overFlowStrategy, bufferSize, defaultBufferSize, deferred);
        }
    }

    Multi<Message<? extends T>> getPublisherForStrategy(String overFlowStrategy, long bufferSize, long defaultBufferSize, Consumer<MultiEmitter<? super Message<? extends T>>> deferred) {
        OnOverflow.Strategy strategy = OnOverflow.Strategy.valueOf((String)overFlowStrategy);
        switch (strategy) {
            case BUFFER: {
                Multi multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
                if (bufferSize > 0L) {
                    return this.getPublisherUsingBufferStrategy(bufferSize, multi);
                }
                return this.getPublisherUsingBufferStrategy(defaultBufferSize, multi);
            }
            case UNBOUNDED_BUFFER: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
            }
            case DROP: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.DROP);
            }
            case FAIL: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.ERROR);
            }
            case LATEST: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.LATEST);
            }
            case NONE: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.IGNORE);
            }
        }
        throw new IllegalArgumentException("Invalid back-pressure strategy: " + overFlowStrategy);
    }

    Multi<Message<? extends T>> getPublisherUsingBufferStrategy(long defaultBufferSize, Multi<Message<? extends T>> stream) {
        int size = (int)defaultBufferSize;
        return stream.on().overflow().buffer(size - 2).onFailure().invoke(t -> this.synchronousFailure.set((Throwable)t));
    }

    public Publisher<Message<? extends T>> getPublisher() {
        return this.publisher;
    }

    boolean isSubscribed() {
        return this.internal.get() != null;
    }

    public synchronized CompletionStage<Void> send(T msg) {
        if (msg == null) {
            throw new IllegalArgumentException("`null` is not a valid value");
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.emit(Message.of(msg, () -> {
            future.complete(null);
            return future;
        }));
        return future;
    }

    private synchronized void emit(Message<? extends T> message) {
        MultiEmitter<Message<T>> emitter = EmitterImpl.verify(this.internal, this.name);
        if (this.synchronousFailure.get() != null) {
            throw new IllegalStateException("The emitter encountered a failure", this.synchronousFailure.get());
        }
        if (emitter.isCancelled()) {
            throw new IllegalStateException("The downstream has cancelled the consumption");
        }
        emitter.emit(message);
        if (this.synchronousFailure.get() != null) {
            throw new IllegalStateException("The emitter encountered a failure while emitting", this.synchronousFailure.get());
        }
    }

    public synchronized <M extends Message<? extends T>> void send(M msg) {
        if (msg == null) {
            throw new IllegalArgumentException("`null` is not a valid value");
        }
        this.emit(msg);
    }

    static <T> MultiEmitter<? super Message<? extends T>> verify(AtomicReference<MultiEmitter<? super Message<? extends T>>> reference, String name) {
        MultiEmitter<? super Message<? extends T>> emitter = reference.get();
        if (emitter == null) {
            throw new IllegalStateException("No subscriber found for the channel " + name);
        }
        if (emitter.isCancelled()) {
            throw new IllegalStateException("The subscription to " + name + " has been cancelled");
        }
        return emitter;
    }

    public synchronized void complete() {
        EmitterImpl.verify(this.internal, this.name).complete();
    }

    public synchronized void error(Exception e) {
        if (e == null) {
            throw new IllegalArgumentException("`null` is not a valid exception");
        }
        EmitterImpl.verify(this.internal, this.name).fail((Throwable)e);
    }

    public synchronized boolean isCancelled() {
        MultiEmitter<? super Message<? extends T>> emitter = this.internal.get();
        return emitter == null || emitter.isCancelled();
    }

    public boolean isRequested() {
        MultiEmitter<? super Message<? extends T>> emitter = this.internal.get();
        return !this.isCancelled() && emitter.requested() > 0L;
    }
}

