package com.netifi.broker.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import java.util.concurrent.CancellationException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:com/netifi/broker/rsocket/ErrorOnDisconnectRSocket.class */
public class ErrorOnDisconnectRSocket extends AbstractRSocket {
    private static final CancellationException CANCELLATION_EXCEPTION = new CancellationException("Connection has closed");
    private final RSocket delegate;
    private final MonoProcessor<Boolean> onCancelHook = MonoProcessor.create();

    public ErrorOnDisconnectRSocket(RSocket rSocket) {
        this.delegate = rSocket;
    }

    public Mono<Void> onClose() {
        return this.delegate.onClose();
    }

    public void dispose() {
        this.onCancelHook.onNext(true);
        this.onCancelHook.onComplete();
        this.delegate.dispose();
    }

    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return wrapMono(this.delegate.requestResponse(payload));
        } catch (Throwable th) {
            payload.release();
            return Mono.error(th);
        }
    }

    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.delegate.fireAndForget(payload);
        } catch (Throwable th) {
            payload.release();
            return Mono.error(th);
        }
    }

    public Flux<Payload> requestStream(Payload payload) {
        try {
            return wrap(this.delegate.requestStream(payload));
        } catch (Throwable th) {
            payload.release();
            return Flux.error(th);
        }
    }

    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.delegate.metadataPush(payload);
        } catch (Throwable th) {
            payload.release();
            return Mono.error(th);
        }
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        try {
            return wrap(this.delegate.requestChannel(publisher));
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    private <T> Mono<T> wrapMono(Mono<T> mono) {
        return Mono.from(wrap(Flux.from(mono)));
    }

    private <T> Flux<T> wrap(final Flux<T> flux) {
        return Flux.from(new Publisher<T>() { // from class: com.netifi.broker.rsocket.ErrorOnDisconnectRSocket.1
            Flux<T> delegate;

            {
                this.delegate = flux;
            }

            public void subscribe(Subscriber<? super T> subscriber) {
                this.delegate.subscribe(wrapSubscriber(subscriber, ErrorOnDisconnectRSocket.this.onCancelHook.subscribe(bool -> {
                    subscriber.onError(ErrorOnDisconnectRSocket.CANCELLATION_EXCEPTION);
                })));
            }

            private Subscriber<? super T> wrapSubscriber(final Subscriber<? super T> subscriber, final Disposable disposable) {
                return new Subscriber<T>() { // from class: com.netifi.broker.rsocket.ErrorOnDisconnectRSocket.1.1
                    public void onSubscribe(Subscription subscription) {
                        subscriber.onSubscribe(wrapSubscription(subscription, disposable));
                    }

                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    public void onError(Throwable th) {
                        if (!disposable.isDisposed()) {
                            disposable.dispose();
                        }
                        subscriber.onError(th);
                    }

                    public void onComplete() {
                        if (!disposable.isDisposed()) {
                            disposable.dispose();
                        }
                        subscriber.onComplete();
                    }
                };
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Subscription wrapSubscription(final Subscription subscription, final Disposable disposable) {
                return new Subscription() { // from class: com.netifi.broker.rsocket.ErrorOnDisconnectRSocket.1.2
                    public void request(long j) {
                        subscription.request(j);
                    }

                    public void cancel() {
                        if (!disposable.isDisposed()) {
                            disposable.dispose();
                        }
                        subscription.cancel();
                    }
                };
            }
        });
    }
}
