package org.springframework.messaging.tcp.reactor;

import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.core.Environment;
import reactor.core.composable.Composable;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.function.Consumer;
import reactor.function.support.SingleUseConsumer;
import reactor.io.Buffer;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.Codec;
import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;

/* loaded from: input_file:org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.class */
public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
    private static final Log logger = LogFactory.getLog(ReactorNettyTcpClient.class);
    private Environment environment = new Environment();
    private TcpClient<Message<P>, Message<P>> tcpClient;

    public ReactorNettyTcpClient(String str, int i, Codec<Buffer, Message<P>, Message<P>> codec) {
        this.tcpClient = (TcpClient) new TcpClientSpec(NettyTcpClient.class).env(this.environment).codec(codec).connect(str, i).get();
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> tcpConnectionHandler) {
        Promise open = this.tcpClient.open();
        composeConnectionHandling(open, tcpConnectionHandler);
        return new AbstractPromiseToListenableFutureAdapter<TcpConnection<Message<P>, Message<P>>, Void>(open) { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.springframework.messaging.tcp.reactor.AbstractPromiseToListenableFutureAdapter
            public Void adapt(TcpConnection<Message<P>, Message<P>> tcpConnection) {
                return null;
            }
        };
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> tcpConnectionHandler, final ReconnectStrategy reconnectStrategy) {
        Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null");
        Stream<TcpConnection<Message<P>, Message<P>>> open = this.tcpClient.open(new Reconnect() { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.2
            public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress inetSocketAddress, int i) {
                return Tuple.of(inetSocketAddress, reconnectStrategy.getTimeToNextAttempt(i));
            }
        });
        composeConnectionHandling(open, tcpConnectionHandler);
        return new PassThroughPromiseToListenableFutureAdapter(toPromise(open));
    }

    private void composeConnectionHandling(Composable<TcpConnection<Message<P>, Message<P>>> composable, final TcpConnectionHandler<P> tcpConnectionHandler) {
        composable.when(Throwable.class, new Consumer<Throwable>() { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.3
            public void accept(Throwable th) {
                tcpConnectionHandler.afterConnectFailure(th);
            }
        });
        composable.consume(new Consumer<TcpConnection<Message<P>, Message<P>>>() { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.4
            public void accept(TcpConnection<Message<P>, Message<P>> tcpConnection) {
                tcpConnection.on().close(new Runnable() { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.4.1
                    @Override // java.lang.Runnable
                    public void run() {
                        tcpConnectionHandler.afterConnectionClosed();
                    }
                });
                tcpConnection.in().consume(new Consumer<Message<P>>() { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.4.2
                    public void accept(Message<P> message) {
                        tcpConnectionHandler.handleMessage(message);
                    }
                });
                tcpConnection.when(Throwable.class, new Consumer<Throwable>() { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.4.3
                    public void accept(Throwable th) {
                        ReactorNettyTcpClient.logger.error("Exception on connection " + tcpConnectionHandler, th);
                    }
                });
                tcpConnectionHandler.afterConnected(new ReactorTcpConnection(tcpConnection));
            }
        });
    }

    private Promise<Void> toPromise(Stream<TcpConnection<Message<P>, Message<P>>> stream) {
        final Deferred deferred = (Deferred) Promises.defer().get();
        stream.consume(SingleUseConsumer.once(new Consumer<TcpConnection<Message<P>, Message<P>>>() { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.5
            public void accept(TcpConnection<Message<P>, Message<P>> tcpConnection) {
                deferred.accept((Void) null);
            }
        }));
        stream.when(Throwable.class, SingleUseConsumer.once(new Consumer<Throwable>() { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.6
            public void accept(Throwable th) {
                deferred.accept(th);
            }
        }));
        return deferred.compose();
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> shutdown() {
        try {
            return new AbstractPromiseToListenableFutureAdapter<Void, Void>(this.tcpClient.close()) { // from class: org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient.7
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.springframework.messaging.tcp.reactor.AbstractPromiseToListenableFutureAdapter
                public Void adapt(Void r3) {
                    return r3;
                }
            };
        } finally {
            this.environment.shutdown();
        }
    }
}
