package com.netifi.broker.rsocket;

import io.netty.util.ReferenceCounted;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:com/netifi/broker/rsocket/DefaultBrokerSocket.class */
public class DefaultBrokerSocket implements BrokerSocket {
    private static final Logger logger = LoggerFactory.getLogger(DefaultBrokerSocket.class);
    private final Function<Payload, Payload> payloadTransformer;
    private final Supplier<RSocket> rSocketSupplier;
    private final MonoProcessor<Void> onClose = MonoProcessor.create();

    public DefaultBrokerSocket(Function<Payload, Payload> function, Supplier<RSocket> supplier) {
        this.payloadTransformer = function;
        this.rSocketSupplier = supplier;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return Mono.defer(() -> {
            Payload apply = this.payloadTransformer.apply(payload);
            if (apply != null && payload.refCnt() > 0) {
                quietRelease(payload);
            }
            return this.rSocketSupplier.get().fireAndForget(apply);
        });
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return Mono.defer(() -> {
            Payload apply = this.payloadTransformer.apply(payload);
            if (apply != null) {
                quietRelease(payload);
            }
            return this.rSocketSupplier.get().requestResponse(apply);
        });
    }

    public Flux<Payload> requestStream(Payload payload) {
        return Flux.defer(() -> {
            Payload apply = this.payloadTransformer.apply(payload);
            if (apply != null) {
                quietRelease(payload);
            }
            return this.rSocketSupplier.get().requestStream(apply);
        });
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return this.rSocketSupplier.get().requestChannel(Flux.from(publisher).map(payload -> {
            Payload apply = this.payloadTransformer.apply(payload);
            if (apply != null) {
                quietRelease(payload);
            }
            return apply;
        }));
    }

    public Mono<Void> metadataPush(Payload payload) {
        return Mono.defer(() -> {
            Payload apply = this.payloadTransformer.apply(payload);
            if (apply != null) {
                quietRelease(payload);
            }
            return this.rSocketSupplier.get().metadataPush(apply);
        });
    }

    private static void quietRelease(ReferenceCounted referenceCounted) {
        try {
            if (referenceCounted.refCnt() > 0) {
                referenceCounted.release();
            }
        } catch (Throwable th) {
            logger.trace("error releasing", th);
        }
    }

    public void dispose() {
        this.onClose.onComplete();
    }

    public boolean isDisposed() {
        return this.onClose.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.onClose;
    }
}
