package com.netifi.broker.rsocket.transport;

import com.netifi.broker.info.Broker;
import com.netifi.common.stats.Ewma;
import io.rsocket.Closeable;
import io.rsocket.transport.ClientTransport;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:com/netifi/broker/rsocket/transport/WeightedClientTransportSupplier.class */
public class WeightedClientTransportSupplier implements Supplier<ClientTransport>, Closeable {
    private static final Logger logger = LoggerFactory.getLogger(WeightedClientTransportSupplier.class);
    private final Function<SocketAddress, ClientTransport> clientTransportFunction;
    private final SocketAddress socketAddress;
    private final Broker broker;
    private final Ewma errorPercentage = new Ewma(5, TimeUnit.SECONDS, 1.0d);
    private final AtomicInteger selectCount = new AtomicInteger();
    private final MonoProcessor<Void> onClose = MonoProcessor.create();

    public WeightedClientTransportSupplier(Broker broker, Function<Broker, InetSocketAddress> function, Function<SocketAddress, ClientTransport> function2) {
        this.broker = broker;
        this.clientTransportFunction = function2;
        this.socketAddress = function.apply(broker);
    }

    public void select() {
        this.selectCount.incrementAndGet();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    public ClientTransport get() {
        if (this.onClose.isDisposed()) {
            throw new IllegalStateException("WeightedClientTransportSupplier is closed");
        }
        int i = this.selectCount.get();
        return i2 -> {
            return this.clientTransportFunction.apply(this.socketAddress).connect(i2).doOnNext(duplexConnection -> {
                logger.debug("opened connection to {} - active connections {}", this.socketAddress, Integer.valueOf(i));
                Disposable subscribe = this.onClose.doFinally(signalType -> {
                    duplexConnection.dispose();
                }).subscribe();
                duplexConnection.onClose().doFinally(signalType2 -> {
                    logger.debug("closed connection {} - active connections {}", this.socketAddress, Integer.valueOf(this.selectCount.decrementAndGet()));
                    subscribe.dispose();
                }).subscribe();
                this.errorPercentage.insert(1.0d);
            }).doOnError(th -> {
                this.errorPercentage.insert(0.0d);
            });
        };
    }

    private double errorPercentage() {
        return this.errorPercentage.value();
    }

    int activeConnections() {
        return this.selectCount.get();
    }

    public double weight() {
        double errorPercentage = errorPercentage();
        int activeConnections = activeConnections();
        return errorPercentage == 1.0d ? activeConnections : Math.exp(1.0d / (1.0d - errorPercentage)) * activeConnections;
    }

    public SocketAddress getSocketAddress() {
        return this.socketAddress;
    }

    public void dispose() {
        this.onClose.onComplete();
    }

    public boolean isDisposed() {
        return this.onClose.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.onClose;
    }

    public Broker getBroker() {
        return this.broker;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return this.socketAddress.equals(((WeightedClientTransportSupplier) obj).socketAddress);
    }

    public int hashCode() {
        return this.socketAddress.hashCode();
    }

    public String toString() {
        return "WeightedClientTransportSupplier{errorPercentage=" + this.errorPercentage + ", socketAddress=" + this.socketAddress + ", selectCount=" + this.selectCount + '}';
    }
}
