package org.elasticsearch.transport;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;

/* loaded from: input_file:WEB-INF/lib/elasticsearch-7.15.2.jar:org/elasticsearch/transport/InboundHandler.class */
public class InboundHandler {
    private static final Logger logger;
    private final ThreadPool threadPool;
    private final OutboundHandler outboundHandler;
    private final NamedWriteableRegistry namedWriteableRegistry;
    private final TransportHandshaker handshaker;
    private final TransportKeepAlive keepAlive;
    private final Transport.ResponseHandlers responseHandlers;
    private final Transport.RequestHandlers requestHandlers;
    private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
    private volatile long slowLogThresholdMs = Long.MAX_VALUE;
    private static final StreamInput EMPTY_STREAM_INPUT;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InboundHandler(ThreadPool threadPool, OutboundHandler outboundHandler, NamedWriteableRegistry namedWriteableRegistry, TransportHandshaker transportHandshaker, TransportKeepAlive transportKeepAlive, Transport.RequestHandlers requestHandlers, Transport.ResponseHandlers responseHandlers) {
        this.threadPool = threadPool;
        this.outboundHandler = outboundHandler;
        this.namedWriteableRegistry = namedWriteableRegistry;
        this.handshaker = transportHandshaker;
        this.keepAlive = transportKeepAlive;
        this.requestHandlers = requestHandlers;
        this.responseHandlers = responseHandlers;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setMessageListener(TransportMessageListener transportMessageListener) {
        if (this.messageListener != TransportMessageListener.NOOP_LISTENER) {
            throw new IllegalStateException("Cannot set message listener twice");
        }
        this.messageListener = transportMessageListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setSlowLogThreshold(TimeValue timeValue) {
        this.slowLogThresholdMs = timeValue.getMillis();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void inboundMessage(TcpChannel tcpChannel, InboundMessage inboundMessage) throws Exception {
        long relativeTimeInMillis = this.threadPool.relativeTimeInMillis();
        tcpChannel.getChannelStats().markAccessed(relativeTimeInMillis);
        TransportLogger.logInboundMessage(tcpChannel, inboundMessage);
        if (inboundMessage.isPing()) {
            this.keepAlive.receiveKeepAlive(tcpChannel);
        } else {
            messageReceived(tcpChannel, inboundMessage, relativeTimeInMillis);
        }
    }

    private void messageReceived(TcpChannel tcpChannel, InboundMessage inboundMessage, long j) throws IOException {
        TransportResponseHandler<? extends TransportResponse> removeHandlerForHandshake;
        InetSocketAddress remoteAddress = tcpChannel.getRemoteAddress();
        Header header = inboundMessage.getHeader();
        if (!$assertionsDisabled && header.needsToReadVariableHeader()) {
            throw new AssertionError();
        }
        ThreadContext threadContext = this.threadPool.getThreadContext();
        try {
            ThreadContext.StoredContext stashContext = threadContext.stashContext();
            try {
                threadContext.setHeaders(header.getHeaders());
                threadContext.putTransient("_remote_address", remoteAddress);
                if (header.isRequest()) {
                    handleRequest(tcpChannel, header, inboundMessage);
                } else {
                    if (!$assertionsDisabled && inboundMessage.isShortCircuit()) {
                        throw new AssertionError();
                    }
                    long requestId = header.getRequestId();
                    if (header.isHandshake()) {
                        removeHandlerForHandshake = this.handshaker.removeHandlerForHandshake(requestId);
                    } else {
                        TransportResponseHandler<? extends TransportResponse> onResponseReceived = this.responseHandlers.onResponseReceived(requestId, this.messageListener);
                        removeHandlerForHandshake = (onResponseReceived == null && header.isError()) ? this.handshaker.removeHandlerForHandshake(requestId) : onResponseReceived;
                    }
                    if (removeHandlerForHandshake != null) {
                        if (inboundMessage.getContentLength() > 0 || !header.getVersion().equals(Version.CURRENT)) {
                            StreamInput namedWriteableStream = namedWriteableStream(inboundMessage.openOrGetStreamInput());
                            assertRemoteVersion(namedWriteableStream, header.getVersion());
                            if (header.isError()) {
                                handlerResponseError(namedWriteableStream, removeHandlerForHandshake);
                            } else {
                                handleResponse(remoteAddress, namedWriteableStream, removeHandlerForHandshake);
                            }
                            if (namedWriteableStream.read() != -1) {
                                throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler [" + removeHandlerForHandshake + "], error [" + header.isError() + "]; resetting");
                            }
                        } else {
                            if (!$assertionsDisabled && header.isError()) {
                                throw new AssertionError();
                            }
                            handleResponse(remoteAddress, EMPTY_STREAM_INPUT, removeHandlerForHandshake);
                        }
                    }
                }
                if (stashContext != null) {
                    stashContext.close();
                }
            } catch (Throwable th) {
                if (stashContext != null) {
                    try {
                        stashContext.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } finally {
            long relativeTimeInMillis = this.threadPool.relativeTimeInMillis() - j;
            long j2 = this.slowLogThresholdMs;
            if (j2 > 0 && relativeTimeInMillis > j2) {
                logger.warn("handling inbound transport message [{}] took [{}ms] which is above the warn threshold of [{}ms]", inboundMessage, Long.valueOf(relativeTimeInMillis), Long.valueOf(j2));
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    private <T extends TransportRequest> void handleRequest(TcpChannel tcpChannel, Header header, InboundMessage inboundMessage) throws IOException {
        String actionName = header.getActionName();
        long requestId = header.getRequestId();
        Version version = header.getVersion();
        if (header.isHandshake()) {
            this.messageListener.onRequestReceived(requestId, actionName);
            if (!$assertionsDisabled && inboundMessage.isShortCircuit()) {
                throw new AssertionError();
            }
            StreamInput namedWriteableStream = namedWriteableStream(inboundMessage.openOrGetStreamInput());
            assertRemoteVersion(namedWriteableStream, header.getVersion());
            TcpTransportChannel tcpTransportChannel = new TcpTransportChannel(this.outboundHandler, tcpChannel, actionName, requestId, version, header.getFeatures(), header.getCompressionScheme(), header.isHandshake(), inboundMessage.takeBreakerReleaseControl());
            try {
                this.handshaker.handleHandshake(tcpTransportChannel, requestId, namedWriteableStream);
                return;
            } catch (Exception e) {
                if (Version.CURRENT.isCompatible(header.getVersion())) {
                    sendErrorResponse(actionName, tcpTransportChannel, e);
                    return;
                } else {
                    logger.warn((Message) new ParameterizedMessage("could not send error response to handshake received on [{}] using wire format version [{}], closing channel", tcpChannel, header.getVersion()), (Throwable) e);
                    tcpChannel.close();
                    return;
                }
            }
        }
        final TcpTransportChannel tcpTransportChannel2 = new TcpTransportChannel(this.outboundHandler, tcpChannel, actionName, requestId, version, header.getFeatures(), header.getCompressionScheme(), header.isHandshake(), inboundMessage.takeBreakerReleaseControl());
        try {
            this.messageListener.onRequestReceived(requestId, actionName);
            if (inboundMessage.isShortCircuit()) {
                sendErrorResponse(actionName, tcpTransportChannel2, inboundMessage.getException());
            } else {
                StreamInput namedWriteableStream2 = namedWriteableStream(inboundMessage.openOrGetStreamInput());
                assertRemoteVersion(namedWriteableStream2, header.getVersion());
                final RequestHandlerRegistry<T> handler = this.requestHandlers.getHandler(actionName);
                if (!$assertionsDisabled && handler == null) {
                    throw new AssertionError();
                }
                final T newRequest = handler.newRequest(namedWriteableStream2);
                try {
                    newRequest.remoteAddress(new TransportAddress(tcpChannel.getRemoteAddress()));
                    if (namedWriteableStream2.read() != -1) {
                        throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + actionName + "], available [" + namedWriteableStream2.available() + "]; resetting");
                    }
                    String executor = handler.getExecutor();
                    if (ThreadPool.Names.SAME.equals(executor)) {
                        try {
                            handler.processMessageReceived(newRequest, tcpTransportChannel2);
                        } catch (Exception e2) {
                            sendErrorResponse(handler.getAction(), tcpTransportChannel2, e2);
                        }
                    } else {
                        boolean z = false;
                        newRequest.incRef();
                        try {
                            this.threadPool.executor(executor).execute(new AbstractRunnable() { // from class: org.elasticsearch.transport.InboundHandler.1
                                /* JADX INFO: Access modifiers changed from: protected */
                                @Override // org.elasticsearch.common.util.concurrent.AbstractRunnable
                                public void doRun() throws Exception {
                                    handler.processMessageReceived(newRequest, tcpTransportChannel2);
                                }

                                @Override // org.elasticsearch.common.util.concurrent.AbstractRunnable
                                public boolean isForceExecution() {
                                    return handler.isForceExecution();
                                }

                                @Override // org.elasticsearch.common.util.concurrent.AbstractRunnable
                                public void onFailure(Exception exc) {
                                    InboundHandler.sendErrorResponse(handler.getAction(), tcpTransportChannel2, exc);
                                }

                                @Override // org.elasticsearch.common.util.concurrent.AbstractRunnable
                                public void onAfter() {
                                    newRequest.decRef();
                                }
                            });
                            z = true;
                            if (1 == 0) {
                                newRequest.decRef();
                            }
                        } catch (Throwable th) {
                            if (!z) {
                                newRequest.decRef();
                            }
                            throw th;
                        }
                    }
                    newRequest.decRef();
                } catch (Throwable th2) {
                    newRequest.decRef();
                    throw th2;
                }
            }
        } catch (Exception e3) {
            sendErrorResponse(actionName, tcpTransportChannel2, e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void sendErrorResponse(String str, TransportChannel transportChannel, Exception exc) {
        try {
            transportChannel.sendResponse(exc);
        } catch (Exception e) {
            e.addSuppressed(exc);
            logger.warn(() -> {
                return new ParameterizedMessage("Failed to send error message back to client for action [{}]", str);
            }, (Throwable) e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends TransportResponse> void handleResponse(InetSocketAddress inetSocketAddress, StreamInput streamInput, TransportResponseHandler<T> transportResponseHandler) {
        try {
            TransportResponse transportResponse = (TransportResponse) transportResponseHandler.read(streamInput);
            transportResponse.remoteAddress(new TransportAddress(inetSocketAddress));
            String executor = transportResponseHandler.executor();
            if (ThreadPool.Names.SAME.equals(executor)) {
                doHandleResponse(transportResponseHandler, transportResponse);
                return;
            }
            boolean z = false;
            try {
                this.threadPool.executor(executor).execute(() -> {
                    doHandleResponse(transportResponseHandler, transportResponse);
                });
                z = true;
                if (1 == 0) {
                    transportResponse.decRef();
                }
            } catch (Throwable th) {
                if (!z) {
                    transportResponse.decRef();
                }
                throw th;
            }
        } catch (Exception e) {
            TransportSerializationException transportSerializationException = new TransportSerializationException("Failed to deserialize response from handler [" + transportResponseHandler + "]", e);
            logger.warn((Message) new ParameterizedMessage("Failed to deserialize response from [{}]", inetSocketAddress), (Throwable) transportSerializationException);
            handleException(transportResponseHandler, transportSerializationException);
        }
    }

    private <T extends TransportResponse> void doHandleResponse(TransportResponseHandler<T> transportResponseHandler, T t) {
        try {
            try {
                transportResponseHandler.handleResponse(t);
                t.decRef();
            } catch (Exception e) {
                handleException(transportResponseHandler, new ResponseHandlerFailureTransportException(e));
                t.decRef();
            }
        } catch (Throwable th) {
            t.decRef();
            throw th;
        }
    }

    private void handlerResponseError(StreamInput streamInput, TransportResponseHandler<?> transportResponseHandler) {
        Exception transportSerializationException;
        try {
            transportSerializationException = streamInput.readException();
        } catch (Exception e) {
            transportSerializationException = new TransportSerializationException("Failed to deserialize exception response from stream for handler [" + transportResponseHandler + "]", e);
        }
        handleException(transportResponseHandler, transportSerializationException);
    }

    private void handleException(TransportResponseHandler<?> transportResponseHandler, Throwable th) {
        if (!(th instanceof RemoteTransportException)) {
            th = new RemoteTransportException(th.getMessage(), th);
        }
        RemoteTransportException remoteTransportException = (RemoteTransportException) th;
        this.threadPool.executor(transportResponseHandler.executor()).execute(() -> {
            try {
                transportResponseHandler.handleException(remoteTransportException);
            } catch (Exception e) {
                logger.error(() -> {
                    return new ParameterizedMessage("failed to handle exception response [{}]", transportResponseHandler);
                }, (Throwable) e);
            }
        });
    }

    private StreamInput namedWriteableStream(StreamInput streamInput) {
        return new NamedWriteableAwareStreamInput(streamInput, this.namedWriteableRegistry);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void assertRemoteVersion(StreamInput streamInput, Version version) {
        if (!$assertionsDisabled && !version.equals(streamInput.getVersion())) {
            throw new AssertionError("Stream version [" + streamInput.getVersion() + "] does not match version [" + version + "]");
        }
    }

    static {
        $assertionsDisabled = !InboundHandler.class.desiredAssertionStatus();
        logger = LogManager.getLogger((Class<?>) InboundHandler.class);
        EMPTY_STREAM_INPUT = new ByteBufferStreamInput(ByteBuffer.wrap(BytesRef.EMPTY_BYTES));
    }
}
