package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocket;
import io.rsocket.RSocketErrorException;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.ServerSetup;
import io.rsocket.exceptions.InvalidSetupException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.lease.TrackingLeaseSender;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import io.rsocket.plugins.InterceptorRegistry;
import io.rsocket.plugins.RequestInterceptor;
import io.rsocket.resume.SessionManager;
import io.rsocket.transport.ServerTransport;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:io/rsocket/core/RSocketServer.class */
public final class RSocketServer {
    private static final String SERVER_TAG = "server";
    private Resume resume;
    private SocketAcceptor acceptor = SocketAcceptor.with(new RSocket() { // from class: io.rsocket.core.RSocketServer.1
    });
    private InitializingInterceptorRegistry interceptors = new InitializingInterceptorRegistry();
    private Consumer<LeaseSpec> leaseConfigurer = null;
    private int mtu = 0;
    private int maxInboundPayloadSize = Integer.MAX_VALUE;
    private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
    private Duration timeout = Duration.ofMinutes(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.rsocket.core.RSocketServer$4, reason: invalid class name */
    /* loaded from: input_file:io/rsocket/core/RSocketServer$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$io$rsocket$frame$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.SETUP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.RESUME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    private RSocketServer() {
    }

    public static RSocketServer create() {
        return new RSocketServer();
    }

    public static RSocketServer create(SocketAcceptor socketAcceptor) {
        return create().acceptor(socketAcceptor);
    }

    public RSocketServer acceptor(SocketAcceptor socketAcceptor) {
        Objects.requireNonNull(socketAcceptor);
        this.acceptor = socketAcceptor;
        return this;
    }

    public RSocketServer interceptors(Consumer<InterceptorRegistry> consumer) {
        consumer.accept(this.interceptors);
        return this;
    }

    public RSocketServer resume(Resume resume) {
        this.resume = resume;
        return this;
    }

    public RSocketServer lease(Consumer<LeaseSpec> consumer) {
        this.leaseConfigurer = consumer;
        return this;
    }

    public RSocketServer maxInboundPayloadSize(int i) {
        this.maxInboundPayloadSize = ReassemblyUtils.assertInboundPayloadSize(i);
        return this;
    }

    public RSocketServer maxTimeToFirstFrame(Duration duration) {
        if (duration.isNegative() || duration.isZero()) {
            throw new IllegalArgumentException("Setup Handling Timeout should be greater than zero");
        }
        this.timeout = duration;
        return this;
    }

    public RSocketServer fragment(int i) {
        this.mtu = FragmentationUtils.assertMtu(i);
        return this;
    }

    public RSocketServer payloadDecoder(PayloadDecoder payloadDecoder) {
        Objects.requireNonNull(payloadDecoder);
        this.payloadDecoder = payloadDecoder;
        return this;
    }

    public <T extends Closeable> Mono<T> bind(final ServerTransport<T> serverTransport) {
        return Mono.defer(new Supplier<Mono<T>>() { // from class: io.rsocket.core.RSocketServer.2
            final ServerSetup serverSetup;

            {
                this.serverSetup = RSocketServer.this.serverSetup(RSocketServer.this.timeout);
            }

            @Override // java.util.function.Supplier
            public Mono<T> get() {
                int maxFrameLength = serverTransport.maxFrameLength();
                PayloadValidationUtils.assertValidateSetup(maxFrameLength, RSocketServer.this.maxInboundPayloadSize, RSocketServer.this.mtu);
                return serverTransport.start(duplexConnection -> {
                    return RSocketServer.this.acceptor(this.serverSetup, duplexConnection, maxFrameLength);
                }).doOnNext(closeable -> {
                    closeable.onClose().doFinally(signalType -> {
                        this.serverSetup.dispose();
                    }).subscribe();
                });
            }
        });
    }

    public <T extends Closeable> T bindNow(ServerTransport<T> serverTransport) {
        return (T) bind(serverTransport).block();
    }

    public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
        return asConnectionAcceptor(16777215);
    }

    public ServerTransport.ConnectionAcceptor asConnectionAcceptor(final int i) {
        PayloadValidationUtils.assertValidateSetup(i, this.maxInboundPayloadSize, this.mtu);
        return new ServerTransport.ConnectionAcceptor() { // from class: io.rsocket.core.RSocketServer.3
            private final ServerSetup serverSetup;

            {
                this.serverSetup = RSocketServer.this.serverSetup(RSocketServer.this.timeout);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.rsocket.transport.ServerTransport.ConnectionAcceptor, java.util.function.Function
            public Mono<Void> apply(DuplexConnection duplexConnection) {
                return RSocketServer.this.acceptor(this.serverSetup, duplexConnection, i);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection duplexConnection, int i) {
        return serverSetup.init(LoggingDuplexConnection.wrapIfEnabled(this.interceptors.initConnection(DuplexConnectionInterceptor.Type.SOURCE, duplexConnection))).flatMap(tuple2 -> {
            return accept(serverSetup, (ByteBuf) tuple2.getT1(), (DuplexConnection) tuple2.getT2(), i);
        });
    }

    private Mono<Void> acceptResume(ServerSetup serverSetup, ByteBuf byteBuf, DuplexConnection duplexConnection) {
        return serverSetup.acceptRSocketResume(byteBuf, duplexConnection);
    }

    private Mono<Void> accept(ServerSetup serverSetup, ByteBuf byteBuf, DuplexConnection duplexConnection, int i) {
        switch (AnonymousClass4.$SwitchMap$io$rsocket$frame$FrameType[FrameHeaderCodec.frameType(byteBuf).ordinal()]) {
            case ErrorFrameCodec.INVALID_SETUP /* 1 */:
                return acceptSetup(serverSetup, byteBuf, duplexConnection, i);
            case ErrorFrameCodec.UNSUPPORTED_SETUP /* 2 */:
                return acceptResume(serverSetup, byteBuf, duplexConnection);
            default:
                serverSetup.sendError(duplexConnection, new InvalidSetupException("SETUP or RESUME frame must be received before any others"));
                return duplexConnection.onClose();
        }
    }

    private Mono<Void> acceptSetup(ServerSetup serverSetup, ByteBuf byteBuf, DuplexConnection duplexConnection, int i) {
        if (!SetupFrameCodec.isSupportedVersion(byteBuf)) {
            serverSetup.sendError(duplexConnection, new InvalidSetupException("Unsupported version: " + SetupFrameCodec.humanReadableVersion(byteBuf)));
            return duplexConnection.onClose();
        }
        boolean z = this.leaseConfigurer != null;
        if (!SetupFrameCodec.honorLease(byteBuf) || z) {
            return serverSetup.acceptRSocketSetup(byteBuf, duplexConnection, (keepAliveHandler, duplexConnection2) -> {
                LeaseSpec leaseSpec;
                RequesterLeaseTracker requesterLeaseTracker;
                DefaultConnectionSetupPayload defaultConnectionSetupPayload = new DefaultConnectionSetupPayload(byteBuf.retain());
                InitializingInterceptorRegistry initializingInterceptorRegistry = this.interceptors;
                ClientServerInputMultiplexer clientServerInputMultiplexer = new ClientServerInputMultiplexer(duplexConnection2, initializingInterceptorRegistry, false);
                if (z) {
                    leaseSpec = new LeaseSpec();
                    this.leaseConfigurer.accept(leaseSpec);
                    requesterLeaseTracker = new RequesterLeaseTracker(SERVER_TAG, leaseSpec.maxPendingRequests);
                } else {
                    leaseSpec = null;
                    requesterLeaseTracker = null;
                }
                Sinks.Empty empty = Sinks.unsafe().empty();
                Sinks.Empty empty2 = Sinks.unsafe().empty();
                DuplexConnection asServerConnection = clientServerInputMultiplexer.asServerConnection();
                PayloadDecoder payloadDecoder = this.payloadDecoder;
                StreamIdSupplier serverSupplier = StreamIdSupplier.serverSupplier();
                int i2 = this.mtu;
                int i3 = this.maxInboundPayloadSize;
                int keepAliveInterval = defaultConnectionSetupPayload.keepAliveInterval();
                int keepAliveMaxLifetime = defaultConnectionSetupPayload.keepAliveMaxLifetime();
                initializingInterceptorRegistry.getClass();
                RSocket initRequester = initializingInterceptorRegistry.initRequester(new RSocketRequester(asServerConnection, payloadDecoder, serverSupplier, i2, i, i3, keepAliveInterval, keepAliveMaxLifetime, keepAliveHandler, initializingInterceptorRegistry::initRequesterRequestInterceptor, requesterLeaseTracker, empty, Mono.whenDelayError(new Publisher[]{empty2.asMono(), empty.asMono()})));
                LeaseSpec leaseSpec2 = leaseSpec;
                return initializingInterceptorRegistry.initSocketAcceptor(this.acceptor).accept(defaultConnectionSetupPayload, initRequester).onErrorResume(th -> {
                    return Mono.fromRunnable(() -> {
                        serverSetup.sendError(duplexConnection2, rejectedSetupError(th));
                    }).then(duplexConnection2.onClose()).then(Mono.error(th));
                }).doOnNext(rSocket -> {
                    Function function;
                    RSocket initResponder = initializingInterceptorRegistry.initResponder(rSocket);
                    DuplexConnection asClientConnection = clientServerInputMultiplexer.asClientConnection();
                    ResponderLeaseTracker responderLeaseTracker = z ? new ResponderLeaseTracker(SERVER_TAG, asClientConnection, leaseSpec2.sender) : null;
                    PayloadDecoder payloadDecoder2 = this.payloadDecoder;
                    int i4 = this.mtu;
                    int i5 = this.maxInboundPayloadSize;
                    if (z && (leaseSpec2.sender instanceof TrackingLeaseSender)) {
                        function = rSocket -> {
                            return initializingInterceptorRegistry.initResponderRequestInterceptor(rSocket, (RequestInterceptor) leaseSpec2.sender);
                        };
                    } else {
                        initializingInterceptorRegistry.getClass();
                        function = rSocket2 -> {
                            return initializingInterceptorRegistry.initResponderRequestInterceptor(rSocket2, new RequestInterceptor[0]);
                        };
                    }
                    new RSocketResponder(asClientConnection, initResponder, payloadDecoder2, responderLeaseTracker, i4, i, i5, function, empty2);
                }).doFinally(signalType -> {
                    defaultConnectionSetupPayload.release();
                }).then();
            });
        }
        serverSetup.sendError(duplexConnection, new InvalidSetupException("lease is not supported"));
        return duplexConnection.onClose();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ServerSetup serverSetup(Duration duration) {
        return this.resume != null ? createSetup(duration) : new ServerSetup.DefaultServerSetup(duration);
    }

    ServerSetup createSetup(Duration duration) {
        return new ServerSetup.ResumableServerSetup(duration, new SessionManager(), this.resume.getSessionDuration(), this.resume.getStreamTimeout(), this.resume.getStoreFactory(SERVER_TAG), this.resume.isCleanupStoreOnKeepAlive());
    }

    private RSocketErrorException rejectedSetupError(Throwable th) {
        String message = th.getMessage();
        return new RejectedSetupException(message == null ? "rejected by server acceptor" : message);
    }
}
