package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.RequestChannelFrameCodec;
import io.rsocket.frame.RequestFireAndForgetFrameCodec;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.RequestResponseFrameCodec;
import io.rsocket.frame.RequestStreamFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RequestInterceptor;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/rsocket/core/RSocketResponder.class */
public class RSocketResponder extends RequesterResponderSupport implements RSocket {
    private final RSocket requestHandler;
    private final Sinks.Empty<Void> onThisSideClosedSink;

    @Nullable
    private final ResponderLeaseTracker leaseHandler;
    private volatile Throwable terminationError;
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketResponder.class);
    private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private static final AtomicReferenceFieldUpdater<RSocketResponder, Throwable> TERMINATION_ERROR = AtomicReferenceFieldUpdater.newUpdater(RSocketResponder.class, Throwable.class, "terminationError");

    /* renamed from: io.rsocket.core.RSocketResponder$1, reason: invalid class name */
    /* loaded from: input_file:io/rsocket/core/RSocketResponder$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$rsocket$frame$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_FNF.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_RESPONSE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_STREAM.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_CHANNEL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.METADATA_PUSH.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.CANCEL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_N.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.PAYLOAD.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.NEXT.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.COMPLETE.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.ERROR.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.NEXT_COMPLETE.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.SETUP.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.LEASE.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketResponder(DuplexConnection duplexConnection, RSocket rSocket, PayloadDecoder payloadDecoder, @Nullable ResponderLeaseTracker responderLeaseTracker, int i, int i2, int i3, Function<RSocket, ? extends RequestInterceptor> function, Sinks.Empty<Void> empty) {
        super(i, i2, i3, payloadDecoder, duplexConnection, null, function);
        this.requestHandler = rSocket;
        this.leaseHandler = responderLeaseTracker;
        this.onThisSideClosedSink = empty;
        duplexConnection.onClose().subscribe((Consumer) null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
        duplexConnection.receive().subscribe(this::handleFrame, th -> {
        });
    }

    private void tryTerminateOnConnectionError(Throwable th) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Try terminate connection on responder side");
        }
        tryTerminate(() -> {
            return th;
        });
    }

    private void tryTerminateOnConnectionClose() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.info("Try terminate connection on responder side");
        }
        tryTerminate(() -> {
            return CLOSED_CHANNEL_EXCEPTION;
        });
    }

    private void tryTerminate(Supplier<Throwable> supplier) {
        if (this.terminationError == null) {
            if (TERMINATION_ERROR.compareAndSet(this, null, supplier.get())) {
                doOnDispose();
            }
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.requestHandler.fireAndForget(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return this.requestHandler.requestResponse(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        try {
            return this.requestHandler.requestStream(payload);
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        try {
            return this.requestHandler.requestChannel(publisher);
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public void dispose() {
        tryTerminate(() -> {
            return new CancellationException("Disposed");
        });
    }

    @Override // io.rsocket.RSocket
    public boolean isDisposed() {
        return getDuplexConnection().isDisposed();
    }

    @Override // io.rsocket.RSocket, io.rsocket.Closeable
    public Mono<Void> onClose() {
        return getDuplexConnection().onClose();
    }

    final void doOnDispose() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("closing responder " + getDuplexConnection());
        }
        cleanUpSendingSubscriptions();
        getDuplexConnection().dispose();
        RequestInterceptor requestInterceptor = getRequestInterceptor();
        if (requestInterceptor != null) {
            requestInterceptor.dispose();
        }
        ResponderLeaseTracker responderLeaseTracker = this.leaseHandler;
        if (responderLeaseTracker != null) {
            responderLeaseTracker.dispose();
        }
        this.requestHandler.dispose();
        this.onThisSideClosedSink.tryEmitEmpty();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("responder closed " + getDuplexConnection());
        }
    }

    private void cleanUpSendingSubscriptions() {
        ArrayList<FrameHandler> arrayList;
        synchronized (this) {
            arrayList = new ArrayList(this.activeStreams.values());
        }
        for (FrameHandler frameHandler : arrayList) {
            if (frameHandler != null) {
                frameHandler.handleCancel();
            }
        }
    }

    final void handleFrame(ByteBuf byteBuf) {
        try {
            int streamId = FrameHeaderCodec.streamId(byteBuf);
            FrameType frameType = FrameHeaderCodec.frameType(byteBuf);
            switch (AnonymousClass1.$SwitchMap$io$rsocket$frame$FrameType[frameType.ordinal()]) {
                case ErrorFrameCodec.INVALID_SETUP /* 1 */:
                    handleFireAndForget(streamId, byteBuf);
                    break;
                case ErrorFrameCodec.UNSUPPORTED_SETUP /* 2 */:
                    handleRequestResponse(streamId, byteBuf);
                    break;
                case 3:
                    handleStream(streamId, byteBuf, RequestStreamFrameCodec.initialRequestN(byteBuf));
                    break;
                case ErrorFrameCodec.REJECTED_RESUME /* 4 */:
                    handleChannel(streamId, byteBuf, RequestChannelFrameCodec.initialRequestN(byteBuf), FrameHeaderCodec.hasComplete(byteBuf));
                    break;
                case 5:
                    handleMetadataPush(metadataPush(super.getPayloadDecoder().apply(byteBuf)));
                    break;
                case 6:
                    FrameHandler frameHandler = super.get(streamId);
                    if (frameHandler != null) {
                        frameHandler.handleCancel();
                        break;
                    }
                    break;
                case 7:
                    FrameHandler frameHandler2 = super.get(streamId);
                    if (frameHandler2 != null) {
                        frameHandler2.handleRequestN(RequestNFrameCodec.requestN(byteBuf));
                        break;
                    }
                    break;
                case 8:
                    break;
                case 9:
                    FrameHandler frameHandler3 = super.get(streamId);
                    if (frameHandler3 != null) {
                        frameHandler3.handleNext(byteBuf, FrameHeaderCodec.hasFollows(byteBuf), false);
                        break;
                    }
                    break;
                case 10:
                    FrameHandler frameHandler4 = super.get(streamId);
                    if (frameHandler4 != null) {
                        frameHandler4.handleComplete();
                        break;
                    }
                    break;
                case 11:
                    FrameHandler frameHandler5 = super.get(streamId);
                    if (frameHandler5 != null) {
                        frameHandler5.handleError(Exceptions.from(streamId, byteBuf));
                        break;
                    }
                    break;
                case 12:
                    FrameHandler frameHandler6 = super.get(streamId);
                    if (frameHandler6 != null) {
                        frameHandler6.handleNext(byteBuf, false, true);
                        break;
                    }
                    break;
                case 13:
                    getDuplexConnection().sendFrame(streamId, ErrorFrameCodec.encode(super.getAllocator(), streamId, new IllegalStateException("Setup frame received post setup.")));
                    break;
                case 14:
                default:
                    getDuplexConnection().sendFrame(streamId, ErrorFrameCodec.encode(super.getAllocator(), streamId, new IllegalStateException("ServerRSocket: Unexpected frame type: " + frameType)));
                    break;
            }
        } catch (Throwable th) {
            LOGGER.error("Unexpected error during frame handling", th);
            getDuplexConnection().sendFrame(0, ErrorFrameCodec.encode(super.getAllocator(), 0, new ConnectionErrorException("Unexpected error during frame handling", th)));
            tryTerminateOnConnectionError(th);
        }
    }

    final void handleFireAndForget(int i, ByteBuf byteBuf) {
        Throwable use;
        ResponderLeaseTracker responderLeaseTracker = this.leaseHandler;
        if (responderLeaseTracker != null && (use = responderLeaseTracker.use()) != null) {
            RequestInterceptor requestInterceptor = getRequestInterceptor();
            if (requestInterceptor != null) {
                requestInterceptor.onReject(use, FrameType.REQUEST_FNF, RequestFireAndForgetFrameCodec.metadata(byteBuf));
                return;
            }
            return;
        }
        if (FrameHeaderCodec.hasFollows(byteBuf)) {
            RequestInterceptor requestInterceptor2 = getRequestInterceptor();
            if (requestInterceptor2 != null) {
                requestInterceptor2.onStart(i, FrameType.REQUEST_FNF, RequestFireAndForgetFrameCodec.metadata(byteBuf));
            }
            add(i, new FireAndForgetResponderSubscriber(i, byteBuf, this, this));
            return;
        }
        RequestInterceptor requestInterceptor3 = getRequestInterceptor();
        if (requestInterceptor3 == null) {
            fireAndForget(super.getPayloadDecoder().apply(byteBuf)).subscribe(FireAndForgetResponderSubscriber.INSTANCE);
        } else {
            requestInterceptor3.onStart(i, FrameType.REQUEST_FNF, RequestFireAndForgetFrameCodec.metadata(byteBuf));
            fireAndForget(super.getPayloadDecoder().apply(byteBuf)).subscribe(new FireAndForgetResponderSubscriber(i, this));
        }
    }

    final void handleRequestResponse(int i, ByteBuf byteBuf) {
        Throwable use;
        ResponderLeaseTracker responderLeaseTracker = this.leaseHandler;
        if (responderLeaseTracker != null && (use = responderLeaseTracker.use()) != null) {
            RequestInterceptor requestInterceptor = getRequestInterceptor();
            if (requestInterceptor != null) {
                requestInterceptor.onReject(use, FrameType.REQUEST_RESPONSE, RequestResponseFrameCodec.metadata(byteBuf));
            }
            sendLeaseRejection(i, use);
            return;
        }
        RequestInterceptor requestInterceptor2 = getRequestInterceptor();
        if (requestInterceptor2 != null) {
            requestInterceptor2.onStart(i, FrameType.REQUEST_RESPONSE, RequestResponseFrameCodec.metadata(byteBuf));
        }
        if (FrameHeaderCodec.hasFollows(byteBuf)) {
            add(i, new RequestResponseResponderSubscriber(i, byteBuf, this, this));
            return;
        }
        RequestResponseResponderSubscriber requestResponseResponderSubscriber = new RequestResponseResponderSubscriber(i, this);
        if (add(i, requestResponseResponderSubscriber)) {
            requestResponse(super.getPayloadDecoder().apply(byteBuf)).subscribe(requestResponseResponderSubscriber);
        }
    }

    final void handleStream(int i, ByteBuf byteBuf, long j) {
        Throwable use;
        ResponderLeaseTracker responderLeaseTracker = this.leaseHandler;
        if (responderLeaseTracker != null && (use = responderLeaseTracker.use()) != null) {
            RequestInterceptor requestInterceptor = getRequestInterceptor();
            if (requestInterceptor != null) {
                requestInterceptor.onReject(use, FrameType.REQUEST_STREAM, RequestStreamFrameCodec.metadata(byteBuf));
            }
            sendLeaseRejection(i, use);
            return;
        }
        RequestInterceptor requestInterceptor2 = getRequestInterceptor();
        if (requestInterceptor2 != null) {
            requestInterceptor2.onStart(i, FrameType.REQUEST_STREAM, RequestStreamFrameCodec.metadata(byteBuf));
        }
        if (FrameHeaderCodec.hasFollows(byteBuf)) {
            add(i, new RequestStreamResponderSubscriber(i, j, byteBuf, this, this));
            return;
        }
        RequestStreamResponderSubscriber requestStreamResponderSubscriber = new RequestStreamResponderSubscriber(i, j, this);
        if (add(i, requestStreamResponderSubscriber)) {
            requestStream(super.getPayloadDecoder().apply(byteBuf)).subscribe(requestStreamResponderSubscriber);
        }
    }

    final void handleChannel(int i, ByteBuf byteBuf, long j, boolean z) {
        Throwable use;
        ResponderLeaseTracker responderLeaseTracker = this.leaseHandler;
        if (responderLeaseTracker != null && (use = responderLeaseTracker.use()) != null) {
            RequestInterceptor requestInterceptor = getRequestInterceptor();
            if (requestInterceptor != null) {
                requestInterceptor.onReject(use, FrameType.REQUEST_CHANNEL, RequestChannelFrameCodec.metadata(byteBuf));
            }
            sendLeaseRejection(i, use);
            return;
        }
        RequestInterceptor requestInterceptor2 = getRequestInterceptor();
        if (requestInterceptor2 != null) {
            requestInterceptor2.onStart(i, FrameType.REQUEST_CHANNEL, RequestChannelFrameCodec.metadata(byteBuf));
        }
        if (FrameHeaderCodec.hasFollows(byteBuf)) {
            add(i, new RequestChannelResponderSubscriber(i, j, byteBuf, this, this));
            return;
        }
        RequestChannelResponderSubscriber requestChannelResponderSubscriber = new RequestChannelResponderSubscriber(i, j, super.getPayloadDecoder().apply(byteBuf), this);
        if (add(i, requestChannelResponderSubscriber)) {
            requestChannel(requestChannelResponderSubscriber).subscribe(requestChannelResponderSubscriber);
            if (z) {
                requestChannelResponderSubscriber.handleComplete();
            }
        }
    }

    private void sendLeaseRejection(int i, Throwable th) {
        getDuplexConnection().sendFrame(i, ErrorFrameCodec.encode(getAllocator(), i, th));
    }

    private void handleMetadataPush(Mono<Void> mono) {
        mono.subscribe(MetadataPushResponderSubscriber.INSTANCE);
    }

    @Override // io.rsocket.core.RequesterResponderSupport
    public boolean add(int i, FrameHandler frameHandler) {
        if (super.add(i, frameHandler)) {
            return true;
        }
        frameHandler.handleCancel();
        return false;
    }
}
