/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.ip.tcp;

import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnection;
import org.springframework.integration.ip.tcp.connection.TcpConnectionFailedCorrelationEvent;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.integration.ip.tcp.connection.TcpListener;
import org.springframework.integration.ip.tcp.connection.TcpNioConnectionSupport;
import org.springframework.integration.ip.tcp.connection.TcpSender;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.SettableListenableFuture;

public class TcpOutboundGateway
extends AbstractReplyProducingMessageHandler
implements TcpSender,
TcpListener,
ManageableLifecycle {
    private static final long DEFAULT_REMOTE_TIMEOUT = 10000L;
    private static final int DEFAULT_SECOND_CHANCE_DELAY = 2;
    private final Map<String, AsyncReply> pendingReplies = new ConcurrentHashMap<String, AsyncReply>();
    private final Semaphore semaphore = new Semaphore(1, true);
    private AbstractClientConnectionFactory connectionFactory;
    private boolean isSingleUse;
    private Expression remoteTimeoutExpression = new ValueExpression((Object)10000L);
    private long requestTimeout = 10000L;
    private EvaluationContext evaluationContext = new StandardEvaluationContext();
    private boolean evaluationContextSet;
    private int secondChanceDelay = 2;
    private boolean closeStreamAfterSend;
    private String unsolicitedMessageChannelName;
    private MessageChannel unsolicitedMessageChannel;

    public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
        connectionFactory.registerListener(this);
        connectionFactory.registerSender(this);
        this.isSingleUse = connectionFactory.isSingleUse();
    }

    public void setRequestTimeout(long requestTimeout) {
        this.requestTimeout = requestTimeout;
    }

    public void setRemoteTimeout(long remoteTimeout) {
        this.remoteTimeoutExpression = new ValueExpression((Object)remoteTimeout);
    }

    public void setRemoteTimeoutExpression(Expression remoteTimeoutExpression) {
        this.remoteTimeoutExpression = remoteTimeoutExpression;
    }

    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        Assert.notNull((Object)evaluationContext, (String)"'evaluationContext' cannot be null");
        this.evaluationContext = evaluationContext;
        this.evaluationContextSet = true;
    }

    public void setReplyChannel(MessageChannel replyChannel) {
        this.setOutputChannel(replyChannel);
    }

    public void setReplyChannelName(String replyChannel) {
        this.setOutputChannelName(replyChannel);
    }

    public void setUnsolicitedMessageChannelName(String unsolicitedMessageChannelName) {
        this.unsolicitedMessageChannelName = unsolicitedMessageChannelName;
    }

    public void setUnsolicitedMessageChannel(MessageChannel unsolicitedMessageChannel) {
        this.unsolicitedMessageChannel = unsolicitedMessageChannel;
    }

    public void setCloseStreamAfterSend(boolean closeStreamAfterSend) {
        this.closeStreamAfterSend = closeStreamAfterSend;
    }

    public void setSecondChanceDelay(int secondChanceDelay) {
        this.secondChanceDelay = secondChanceDelay;
    }

    public String getComponentType() {
        return "ip:tcp-outbound-gateway";
    }

    protected void doInit() {
        super.doInit();
        if (!this.evaluationContextSet) {
            this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.getBeanFactory());
        }
        Assert.state((!this.closeStreamAfterSend || this.isSingleUse ? 1 : 0) != 0, (String)"Single use connection needed with closeStreamAfterSend");
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected Object handleRequestMessage(Message<?> requestMessage) {
        Message<?> message;
        block10: {
            AsyncReply reply;
            boolean async;
            String connectionId;
            TcpConnectionSupport connection;
            boolean haveSemaphore;
            block8: {
                SettableListenableFuture<Message<?>> settableListenableFuture;
                block9: {
                    Assert.notNull((Object)this.connectionFactory, () -> this.getClass().getName() + " requires a client connection factory");
                    haveSemaphore = false;
                    connection = null;
                    connectionId = null;
                    async = this.isAsync();
                    haveSemaphore = this.acquireSemaphoreIfNeeded(requestMessage);
                    connection = this.connectionFactory.getConnection();
                    this.checkAsync(connection, async);
                    Long remoteTimeout = this.getRemoteTimeout(requestMessage);
                    reply = new AsyncReply(remoteTimeout, connection, haveSemaphore, requestMessage, async);
                    connectionId = connection.getConnectionId();
                    this.pendingReplies.put(connectionId, reply);
                    String connectionIdToLog = connectionId;
                    this.logger.debug(() -> "Added pending reply " + connectionIdToLog);
                    connection.send(requestMessage);
                    if (this.closeStreamAfterSend) {
                        connection.shutdownOutput();
                    }
                    if (!async) break block8;
                    settableListenableFuture = reply.getFuture();
                    if (async) break block9;
                    this.cleanUp(haveSemaphore, connection, connectionId);
                }
                return settableListenableFuture;
            }
            try {
                message = this.getReply(requestMessage, connection, connectionId, reply);
                if (async) break block10;
            }
            catch (IOException | RuntimeException ex) {
                try {
                    this.logger.error((Throwable)ex, (CharSequence)"Tcp Gateway exception");
                    throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage, () -> "Failed to send or receive", (Throwable)ex);
                    catch (InterruptedException ex2) {
                        Thread.currentThread().interrupt();
                        throw new MessageHandlingException(requestMessage, "Interrupted in the [" + this + ']', (Throwable)ex2);
                    }
                }
                catch (Throwable throwable) {
                    if (!async) {
                        this.cleanUp(haveSemaphore, connection, connectionId);
                    }
                    throw throwable;
                }
            }
            this.cleanUp(haveSemaphore, connection, connectionId);
        }
        return message;
    }

    private void checkAsync(TcpConnection connection, boolean async) {
        if (async && connection instanceof TcpNioConnectionSupport) {
            this.setAsync(false);
            this.logger.warn((CharSequence)"Async replies are not supported with NIO; see the reference manual");
        }
    }

    private boolean acquireSemaphoreIfNeeded(Message<?> requestMessage) throws InterruptedException {
        if (!this.isSingleUse) {
            this.logger.debug((CharSequence)"trying semaphore");
            if (!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
                throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection");
            }
            this.logger.debug((CharSequence)"got semaphore");
            return true;
        }
        return false;
    }

    private Long getRemoteTimeout(Message<?> requestMessage) {
        Long remoteTimeout = (Long)this.remoteTimeoutExpression.getValue(this.evaluationContext, requestMessage, Long.class);
        if (remoteTimeout == null) {
            remoteTimeout = 10000L;
            this.logger.warn(() -> "remoteTimeoutExpression evaluated to null; falling back to default for message " + requestMessage);
        }
        return remoteTimeout;
    }

    private Message<?> getReply(Message<?> requestMessage, TcpConnection connection, String connectionId, AsyncReply reply) {
        Message<?> replyMessage = reply.getReply();
        if (replyMessage == null) {
            this.logger.debug(() -> "Remote Timeout on " + connectionId);
            this.connectionFactory.forceClose(connection);
            throw new MessageTimeoutException(requestMessage, "Timed out waiting for response");
        }
        this.logger.debug(() -> "Response " + replyMessage);
        return replyMessage;
    }

    private void cleanUp(boolean haveSemaphore, TcpConnection connection, String connectionId) {
        if (connectionId != null) {
            this.pendingReplies.remove(connectionId);
            this.logger.debug(() -> "Removed pending reply " + connectionId);
            if (this.isSingleUse) {
                connection.close();
            }
        }
        if (haveSemaphore) {
            this.semaphore.release();
            this.logger.debug((CharSequence)"released semaphore");
        }
    }

    @Override
    public boolean onMessage(Message<?> message) {
        String connectionId = (String)message.getHeaders().get((Object)"ip_connectionId", String.class);
        if (connectionId == null) {
            if (this.unsolicitedSupported(message)) {
                return false;
            }
            this.logger.error((CharSequence)"Cannot correlate response - no connection id");
            this.publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id");
            return false;
        }
        this.logger.trace(() -> "onMessage: " + connectionId + "(" + message + ")");
        AsyncReply reply = this.pendingReplies.get(connectionId);
        if (reply == null) {
            if (message instanceof ErrorMessage) {
                return false;
            }
            if (this.unsolicitedSupported(message)) {
                return false;
            }
            String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
            this.logger.error((CharSequence)errorMessage);
            this.publishNoConnectionEvent(message, connectionId, errorMessage);
            return false;
        }
        if (this.isAsync()) {
            reply.getFuture().set(message);
            this.cleanUp(reply.isHaveSemaphore(), reply.getConnection(), connectionId);
        } else {
            reply.setReply(message);
        }
        return false;
    }

    private boolean unsolicitedSupported(Message<?> message) {
        String channelName = this.unsolicitedMessageChannelName;
        if (channelName != null) {
            this.unsolicitedMessageChannel = (MessageChannel)this.getChannelResolver().resolveDestination(channelName);
            this.unsolicitedMessageChannelName = null;
        }
        if (this.unsolicitedMessageChannel != null) {
            try {
                this.messagingTemplate.send((Object)this.unsolicitedMessageChannel, message);
            }
            catch (Exception ex) {
                this.logger.error((Throwable)ex, (CharSequence)("Failed to send unsolicited message " + message));
            }
            return true;
        }
        return false;
    }

    private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) {
        ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent)new TcpConnectionFailedCorrelationEvent(this, connectionId, new MessagingException(message, errorMessage)));
        }
    }

    @Override
    public void addNewConnection(TcpConnection connection) {
    }

    @Override
    public void removeDeadConnection(TcpConnection connection) {
    }

    public void start() {
        this.connectionFactory.start();
    }

    public void stop() {
        this.connectionFactory.stop();
    }

    public boolean isRunning() {
        return this.connectionFactory.isRunning();
    }

    protected AbstractConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    private final class AsyncReply {
        private final CountDownLatch latch;
        private final CountDownLatch secondChanceLatch;
        private final long remoteTimeout;
        private final TcpConnection connection;
        private final boolean haveSemaphore;
        private final SettableListenableFuture<Message<?>> future = new SettableListenableFuture();
        private volatile Message<?> reply;

        AsyncReply(long remoteTimeout, TcpConnection connection, boolean haveSemaphore, Message<?> requestMessage, boolean async) {
            this.latch = new CountDownLatch(1);
            this.secondChanceLatch = new CountDownLatch(1);
            this.remoteTimeout = remoteTimeout;
            this.connection = connection;
            this.haveSemaphore = haveSemaphore;
            if (async && remoteTimeout > 0L) {
                TcpOutboundGateway.this.getTaskScheduler().schedule(() -> {
                    TcpOutboundGateway.this.pendingReplies.remove(connection.getConnectionId());
                    this.future.setException((Throwable)new MessageTimeoutException(requestMessage, "Timed out waiting for response"));
                }, new Date(System.currentTimeMillis() + remoteTimeout));
            }
        }

        TcpConnection getConnection() {
            return this.connection;
        }

        boolean isHaveSemaphore() {
            return this.haveSemaphore;
        }

        Message<?> getReply() {
            try {
                if (!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
                    return null;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            boolean waitForMessageAfterError = true;
            while (this.reply instanceof ErrorMessage) {
                if (waitForMessageAfterError) {
                    TcpOutboundGateway.this.logger.debug((CharSequence)"second chance");
                    try {
                        this.secondChanceLatch.await(TcpOutboundGateway.this.secondChanceDelay, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.doThrowErrorMessagePayload();
                    }
                    waitForMessageAfterError = false;
                    continue;
                }
                this.doThrowErrorMessagePayload();
            }
            return this.reply;
        }

        SettableListenableFuture<Message<?>> getFuture() {
            return this.future;
        }

        private void doThrowErrorMessagePayload() {
            if (this.reply.getPayload() instanceof MessagingException) {
                throw (MessagingException)this.reply.getPayload();
            }
            throw new MessagingException("Exception while awaiting reply", (Throwable)this.reply.getPayload());
        }

        public void setReply(Message<?> reply) {
            if (this.reply == null) {
                this.reply = reply;
                this.latch.countDown();
            } else if (this.reply instanceof ErrorMessage) {
                this.reply = reply;
                this.secondChanceLatch.countDown();
            }
        }
    }
}

