package com.netifi.broker.rsocket;

import com.netifi.broker.rsocket.transport.WeightedClientTransportSupplier;
import com.netifi.common.stats.Ewma;
import com.netifi.common.stats.Median;
import com.netifi.common.stats.Quantile;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.rpc.exception.TimeoutException;
import io.rsocket.util.Clock;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:com/netifi/broker/rsocket/WeightedReconnectingRSocket.class */
public class WeightedReconnectingRSocket implements WeightedRSocket {
    private static final double STARTUP_PENALTY = 2.251799813685247E15d;
    private final Quantile lowerQuantile;
    private final Quantile higherQuantile;
    private final long inactivityFactor;
    private final long tau;
    private final Ewma errorPercentage;
    private final MonoProcessor<Void> onClose;
    private final Supplier<Payload> setupPayloadSupplier;
    private final BooleanSupplier running;
    private final boolean keepalive;
    private final long tickPeriodSeconds;
    private final long ackTimeoutSeconds;
    private final int missedAcks;
    private final RSocket requestHandlingRSocket;
    private final long accessKey;
    private final ByteBuf accessToken;
    private final Supplier<WeightedClientTransportSupplier> transportSupplier;
    private volatile int pending;
    private long errorStamp;
    private long stamp;
    private long stamp0;
    private long duration;
    private Median median;
    private Ewma interArrivalTime;
    private AtomicLong pendingStreams;
    private long attempts;
    private MonoProcessor<RSocket> currentSink;
    private static final Logger logger = LoggerFactory.getLogger(WeightedReconnectingRSocket.class);
    private static final RSocket EMPTY_SOCKET = new AbstractRSocket() { // from class: com.netifi.broker.rsocket.WeightedReconnectingRSocket.1
    };
    private static final long DEFAULT_INITIAL_INTER_ARRIVAL_TIME = Clock.unit().convert(1, TimeUnit.SECONDS);
    boolean connecting = false;
    private double availability = 0.0d;
    private long CONNECTION_ATTEMPT_RESET_TS = Duration.ofMinutes(1).toMillis();
    private long lastConnectionAttemptTs = System.currentTimeMillis();

    WeightedReconnectingRSocket(RSocket rSocket, Supplier<Payload> supplier, BooleanSupplier booleanSupplier, Supplier<WeightedClientTransportSupplier> supplier2, boolean z, long j, long j2, int i, long j3, ByteBuf byteBuf, Quantile quantile, Quantile quantile2, int i2) {
        this.transportSupplier = supplier2;
        this.lowerQuantile = quantile;
        this.higherQuantile = quantile2;
        this.inactivityFactor = i2;
        long now = Clock.now();
        this.stamp = now;
        this.errorStamp = now;
        this.stamp0 = now;
        this.duration = 0L;
        this.pending = 0;
        this.median = new Median();
        this.interArrivalTime = new Ewma(1L, TimeUnit.MINUTES, DEFAULT_INITIAL_INTER_ARRIVAL_TIME);
        this.pendingStreams = new AtomicLong();
        this.errorPercentage = new Ewma(5L, TimeUnit.SECONDS, 1.0d);
        this.tau = Clock.unit().convert((long) (5.0d / Math.log(2.0d)), TimeUnit.SECONDS);
        this.requestHandlingRSocket = rSocket;
        this.onClose = MonoProcessor.create();
        this.setupPayloadSupplier = supplier;
        this.running = booleanSupplier;
        this.keepalive = z;
        this.accessKey = j3;
        this.accessToken = byteBuf;
        this.tickPeriodSeconds = j;
        this.ackTimeoutSeconds = j2;
        this.missedAcks = i;
    }

    public static WeightedReconnectingRSocket newInstance(RSocket rSocket, Supplier<Payload> supplier, BooleanSupplier booleanSupplier, Supplier<WeightedClientTransportSupplier> supplier2, boolean z, long j, long j2, int i, long j3, ByteBuf byteBuf, Quantile quantile, Quantile quantile2, int i2) {
        WeightedReconnectingRSocket weightedReconnectingRSocket = new WeightedReconnectingRSocket(rSocket, supplier, booleanSupplier, supplier2, z, j, j2, i, j3, byteBuf, quantile, quantile2, i2);
        weightedReconnectingRSocket.resetMono();
        weightedReconnectingRSocket.connect();
        return weightedReconnectingRSocket;
    }

    private synchronized Duration calculateRetryDuration() {
        long currentTimeMillis = System.currentTimeMillis();
        long j = this.lastConnectionAttemptTs;
        long min = Math.min(this.attempts, 30L);
        if (currentTimeMillis - j > this.CONNECTION_ATTEMPT_RESET_TS) {
            this.attempts = 0L;
        }
        this.lastConnectionAttemptTs = currentTimeMillis;
        this.attempts++;
        return Duration.ofSeconds(min);
    }

    synchronized void resetStatistics() {
        long now = Clock.now();
        this.stamp = now;
        this.errorStamp = now;
        this.stamp0 = now;
        this.duration = 0L;
        this.pending = 0;
        this.median.reset();
        this.interArrivalTime.reset(DEFAULT_INITIAL_INTER_ARRIVAL_TIME);
        this.pendingStreams.set(0L);
        this.errorPercentage.reset(1.0d);
    }

    private RSocketFactory.ClientRSocketFactory getClientFactory() {
        RSocketFactory.ClientRSocketFactory frameDecoder = RSocketFactory.connect().frameDecoder(PayloadDecoder.ZERO_COPY);
        if (this.keepalive) {
            frameDecoder = frameDecoder.keepAlive().keepAliveTickPeriod(Duration.ofSeconds(this.tickPeriodSeconds)).keepAliveAckTimeout(Duration.ofSeconds(this.ackTimeoutSeconds)).keepAliveMissedAcks(this.missedAcks);
        } else {
            frameDecoder.keepAlive().keepAliveTickPeriod(Duration.ofSeconds(0L)).keepAliveAckTimeout(Duration.ofSeconds(0L)).keepAliveMissedAcks(this.missedAcks);
        }
        return frameDecoder.setupPayload(this.setupPayloadSupplier.get());
    }

    void connect() {
        synchronized (this) {
            if (this.connecting) {
                return;
            }
            this.connecting = true;
            Mono.defer(() -> {
                return Mono.delay(calculateRetryDuration());
            }).then(Mono.defer(() -> {
                if (this.onClose.isDisposed()) {
                    return Mono.error(new ClosedChannelException());
                }
                WeightedClientTransportSupplier weightedClientTransportSupplier = this.transportSupplier.get();
                long nanoTime = System.nanoTime();
                return getClientFactory().errorConsumer(th -> {
                    logger.error("netifi client received unhandled exception for connection with address " + weightedClientTransportSupplier.getSocketAddress().toString(), th);
                }).acceptor(rSocket -> {
                    return this.requestHandlingRSocket == null ? EMPTY_SOCKET : this.requestHandlingRSocket;
                }).transport(weightedClientTransportSupplier.get()).start().doOnNext(rSocket2 -> {
                    this.availability = 1.0d;
                    ErrorOnDisconnectRSocket errorOnDisconnectRSocket = new ErrorOnDisconnectRSocket(rSocket2);
                    rSocket2.onClose().doFinally(signalType -> {
                        if (Duration.ofNanos(System.nanoTime() - nanoTime).getSeconds() < 2) {
                            logger.warn("connection closed in less than 2 seconds - make sure access key {} has a valid access token", Long.valueOf(this.accessKey));
                        }
                        this.availability = 0.0d;
                        synchronized (this) {
                            this.connecting = false;
                        }
                        errorOnDisconnectRSocket.dispose();
                        connect();
                    }).subscribe();
                    setRSocket(errorOnDisconnectRSocket);
                });
            })).doOnError(th -> {
                logger.error("error trying to broker", th);
            }).retry().doFinally(signalType -> {
                if (SignalType.ON_ERROR != signalType) {
                    synchronized (this) {
                        this.connecting = false;
                    }
                }
            }).subscribe();
        }
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return getRSocket().flatMap(rSocket -> {
            long start = start();
            try {
                return rSocket.fireAndForget(payload).doOnCancel(() -> {
                    stop(start);
                }).doOnSuccessOrError((r9, th) -> {
                    long stop = stop(start);
                    if (th == null || (th instanceof TimeoutException)) {
                        record(stop - start);
                    }
                    if (th != null) {
                        recordError(0.0d);
                    } else {
                        recordError(1.0d);
                    }
                });
            } catch (Throwable th2) {
                stop(start);
                recordError(0.0d);
                return Mono.error(th2);
            }
        });
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return getRSocket().flatMap(rSocket -> {
            long start = start();
            try {
                return rSocket.requestResponse(payload).doOnCancel(() -> {
                    stop(start);
                }).doOnSuccessOrError((payload2, th) -> {
                    long stop = stop(start);
                    if (th == null || (th instanceof TimeoutException)) {
                        record(stop - start);
                    }
                    if (th != null) {
                        recordError(0.0d);
                    } else {
                        recordError(1.0d);
                    }
                });
            } catch (Throwable th2) {
                stop(start);
                recordError(0.0d);
                return Mono.error(th2);
            }
        });
    }

    public Flux<Payload> requestStream(Payload payload) {
        return getRSocket().flatMapMany(rSocket -> {
            try {
                this.pendingStreams.incrementAndGet();
                return rSocket.requestStream(payload).doFinally(signalType -> {
                    this.pendingStreams.decrementAndGet();
                }).doOnNext(payload2 -> {
                    recordError(1.0d);
                }).doOnError(th -> {
                    recordError(0.0d);
                });
            } catch (Throwable th2) {
                recordError(0.0d);
                return Flux.error(th2);
            }
        });
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return getRSocket().flatMapMany(rSocket -> {
            try {
                this.pendingStreams.incrementAndGet();
                return rSocket.requestChannel(publisher).doFinally(signalType -> {
                    this.pendingStreams.decrementAndGet();
                }).doOnNext(payload -> {
                    recordError(1.0d);
                }).doOnError(th -> {
                    recordError(0.0d);
                });
            } catch (Throwable th2) {
                recordError(0.0d);
                return Flux.error(th2);
            }
        });
    }

    public Mono<Void> metadataPush(Payload payload) {
        return getRSocket().flatMap(rSocket -> {
            long start = start();
            try {
                return rSocket.metadataPush(payload).doOnCancel(() -> {
                    stop(start);
                }).doOnSuccessOrError((r9, th) -> {
                    long stop = stop(start);
                    if (th == null || (th instanceof TimeoutException)) {
                        record(stop - start);
                    }
                    if (th != null) {
                        recordError(0.0d);
                    } else {
                        recordError(1.0d);
                    }
                });
            } catch (Throwable th2) {
                stop(start);
                recordError(0.0d);
                return Mono.error(th2);
            }
        });
    }

    @Override // com.netifi.broker.rsocket.WeightedRSocket
    public synchronized double predictedLatency() {
        double d;
        long now = Clock.now();
        long max = Math.max(now - this.stamp, 1L);
        double estimation = this.median.estimation();
        if (estimation == 0.0d) {
            d = this.pending == 0 ? 0.0d : STARTUP_PENALTY + this.pending;
        } else if (this.pending != 0 || max <= this.inactivityFactor * this.interArrivalTime.value()) {
            double d2 = estimation * this.pending;
            double instantaneous = instantaneous(now);
            d = d2 < instantaneous ? instantaneous / this.pending : estimation;
        } else {
            this.median.insert(0.0d);
            d = this.median.estimation();
        }
        return d;
    }

    private synchronized long instantaneous(long j) {
        return this.duration + ((j - this.stamp0) * this.pending);
    }

    private synchronized long start() {
        long now = Clock.now();
        this.interArrivalTime.insert(now - this.stamp);
        this.duration += Math.max(0L, now - this.stamp0) * this.pending;
        this.pending++;
        this.stamp = now;
        this.stamp0 = now;
        return now;
    }

    private synchronized long stop(long j) {
        long now = Clock.now();
        this.duration += (Math.max(0L, now - this.stamp0) * this.pending) - (now - j);
        this.pending--;
        this.stamp0 = now;
        return now;
    }

    private synchronized void record(double d) {
        this.median.insert(d);
        this.lowerQuantile.insert(d);
        this.higherQuantile.insert(d);
    }

    private synchronized void recordError(double d) {
        this.errorPercentage.insert(d);
        this.errorStamp = Clock.now();
    }

    @Override // com.netifi.broker.rsocket.WeightedRSocket
    public double errorPercentage() {
        return this.errorPercentage.value();
    }

    @Override // com.netifi.broker.rsocket.WeightedRSocket
    public double medianLatency() {
        return this.median.estimation();
    }

    @Override // com.netifi.broker.rsocket.WeightedRSocket
    public double lowerQuantileLatency() {
        return this.lowerQuantile.estimation();
    }

    @Override // com.netifi.broker.rsocket.WeightedRSocket
    public double higherQuantileLatency() {
        return this.higherQuantile.estimation();
    }

    @Override // com.netifi.broker.rsocket.WeightedRSocket
    public double interArrivalTime() {
        return this.interArrivalTime.value();
    }

    @Override // com.netifi.broker.rsocket.WeightedRSocket
    public int pending() {
        return this.pending;
    }

    @Override // com.netifi.broker.rsocket.WeightedRSocket
    public long lastTimeUsedMillis() {
        return this.stamp0;
    }

    public double availability() {
        if (Clock.now() - this.stamp > this.tau) {
            recordError(1.0d);
        }
        return this.availability * this.errorPercentage.value();
    }

    public void dispose() {
        this.onClose.onComplete();
    }

    public boolean isDisposed() {
        return this.onClose.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.onClose;
    }

    void resetMono() {
        MonoProcessor<RSocket> monoProcessor;
        if (this.onClose.isDisposed()) {
            return;
        }
        synchronized (this) {
            monoProcessor = this.currentSink;
            this.currentSink = MonoProcessor.create();
        }
        if (monoProcessor == null || monoProcessor.isTerminated()) {
            return;
        }
        monoProcessor.onError(new InterruptedException("reset will waiting for new connection"));
    }

    synchronized Mono<RSocket> getRSocket() {
        return this.currentSink;
    }

    void setRSocket(RSocket rSocket) {
        MonoProcessor<RSocket> monoProcessor;
        synchronized (this) {
            monoProcessor = this.currentSink;
            resetStatistics();
        }
        monoProcessor.onNext(rSocket);
        monoProcessor.onComplete();
        Disposable subscribe = this.onClose.doFinally(signalType -> {
            rSocket.dispose();
            monoProcessor.cancel();
        }).subscribe();
        rSocket.onClose().doFinally(signalType2 -> {
            subscribe.dispose();
            resetMono();
        }).subscribe();
    }

    public String toString() {
        return "WeightedReconnectingRSocket{lowerQuantile=" + this.lowerQuantile.estimation() + ", higherQuantile=" + this.higherQuantile.estimation() + ", inactivityFactor=" + this.inactivityFactor + ", tau=" + this.tau + ", errorPercentage=" + this.errorPercentage.value() + ", running=" + this.running + ", keepalive=" + this.keepalive + ", tickPeriodSeconds=" + this.tickPeriodSeconds + ", ackTimeoutSeconds=" + this.ackTimeoutSeconds + ", missedAcks=" + this.missedAcks + ", accessKey=" + this.accessKey + ", accessToken=" + ByteBufUtil.hexDump(this.accessToken) + ", pending=" + this.pending + ", errorStamp=" + this.errorStamp + ", stamp=" + this.stamp + ", stamp0=" + this.stamp0 + ", duration=" + this.duration + ", median=" + this.median.estimation() + ", interArrivalTime=" + this.interArrivalTime.value() + ", pendingStreams=" + this.pendingStreams.get() + ", availability=" + this.availability + '}';
    }
}
