/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.net.SocketAddress;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.messaging.Channel;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.logging.Log;
import org.neo4j.logging.internal.CappedLogger;

public class ReconnectingChannel
implements Channel {
    public static final AttributeKey<ProtocolStack> PROTOCOL_STACK_KEY = AttributeKey.valueOf((String)"PROTOCOL_STACK");
    private final Log log;
    private final Bootstrap bootstrap;
    private final EventLoop eventLoop;
    private final org.neo4j.helpers.SocketAddress destination;
    private final TimeoutStrategy connectionBackoffStrategy;
    private volatile io.netty.channel.Channel channel;
    private volatile ChannelFuture fChannel;
    private volatile boolean disposed;
    private TimeoutStrategy.Timeout connectionBackoff;
    private CappedLogger cappedLogger;

    ReconnectingChannel(Bootstrap bootstrap, EventLoop eventLoop, org.neo4j.helpers.SocketAddress destination, Log log) {
        this(bootstrap, eventLoop, destination, log, new ExponentialBackoffStrategy(100L, 1600L, TimeUnit.MILLISECONDS));
    }

    private ReconnectingChannel(Bootstrap bootstrap, EventLoop eventLoop, org.neo4j.helpers.SocketAddress destination, Log log, TimeoutStrategy connectionBackoffStrategy) {
        this.bootstrap = bootstrap;
        this.eventLoop = eventLoop;
        this.destination = destination;
        this.log = log;
        this.cappedLogger = new CappedLogger(log).setTimeLimit(20L, TimeUnit.SECONDS, Clock.systemUTC());
        this.connectionBackoffStrategy = connectionBackoffStrategy;
        this.connectionBackoff = connectionBackoffStrategy.newTimeout();
    }

    void start() {
        this.tryConnect();
    }

    private synchronized void tryConnect() {
        if (this.disposed) {
            return;
        }
        if (this.fChannel != null && !this.fChannel.isDone()) {
            return;
        }
        this.fChannel = this.bootstrap.connect((SocketAddress)this.destination.socketAddress());
        this.channel = this.fChannel.channel();
        this.fChannel.addListener(f -> {
            if (!f.isSuccess()) {
                long millis = this.connectionBackoff.getMillis();
                this.cappedLogger.warn("Failed to connect to: " + this.destination.socketAddress() + ". Retrying in " + millis + " ms");
                f.channel().eventLoop().schedule(this::tryConnect, millis, TimeUnit.MILLISECONDS);
                this.connectionBackoff.increment();
            } else {
                this.log.info("Connected: " + f.channel());
                f.channel().closeFuture().addListener(closed -> {
                    this.log.warn(String.format("Lost connection to: %s (%s)", this.destination, this.channel.remoteAddress()));
                    this.connectionBackoff = this.connectionBackoffStrategy.newTimeout();
                    f.channel().eventLoop().schedule(this::tryConnect, 0L, TimeUnit.MILLISECONDS);
                });
            }
        });
    }

    @Override
    public synchronized void dispose() {
        this.disposed = true;
        this.channel.close();
    }

    @Override
    public boolean isDisposed() {
        return this.disposed;
    }

    @Override
    public boolean isOpen() {
        return this.channel.isOpen();
    }

    @Override
    public Future<Void> write(Object msg) {
        return this.write(msg, false);
    }

    @Override
    public Future<Void> writeAndFlush(Object msg) {
        return this.write(msg, true);
    }

    private Future<Void> write(Object msg, boolean flush) {
        if (this.disposed) {
            throw new IllegalStateException("sending on disposed channel");
        }
        if (this.channel.isActive()) {
            if (flush) {
                return this.channel.writeAndFlush(msg);
            }
            return this.channel.write(msg);
        }
        Promise promise = this.eventLoop.newPromise();
        BiConsumer<io.netty.channel.Channel, Object> writer = flush ? (channel, message) -> ReconnectingChannel.chain(channel.writeAndFlush(msg), (Promise<Void>)promise) : (channel, message) -> ReconnectingChannel.chain(channel.write(msg), (Promise<Void>)promise);
        this.deferredWrite(msg, this.fChannel, (Promise<Void>)promise, true, writer);
        return promise;
    }

    private static void chain(ChannelFuture when, Promise<Void> then) {
        when.addListener(f -> {
            if (f.isSuccess()) {
                then.setSuccess(when.get());
            } else {
                then.setFailure(when.cause());
            }
        });
    }

    private void deferredWrite(Object msg, ChannelFuture channelFuture, Promise<Void> promise, boolean firstAttempt, BiConsumer<io.netty.channel.Channel, Object> writer) {
        channelFuture.addListener((GenericFutureListener)((ChannelFutureListener)f -> {
            if (f.isSuccess()) {
                writer.accept(f.channel(), msg);
            } else if (firstAttempt) {
                this.tryConnect();
                this.deferredWrite(msg, this.fChannel, promise, false, writer);
            } else {
                promise.setFailure(f.cause());
            }
        }));
    }

    public Optional<ProtocolStack> installedProtocolStack() {
        return Optional.ofNullable(this.channel.attr(PROTOCOL_STACK_KEY).get());
    }

    public String toString() {
        return "ReconnectingChannel{channel=" + this.channel + ", disposed=" + this.disposed + '}';
    }
}

