/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.client.thin;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.client.ClientAuthenticationException;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientOperationType;
import org.apache.ignite.client.ClientRetryPolicy;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.client.thin.ClientCacheAffinityContext;
import org.apache.ignite.internal.client.thin.ClientChannel;
import org.apache.ignite.internal.client.thin.ClientChannelConfiguration;
import org.apache.ignite.internal.client.thin.ClientError;
import org.apache.ignite.internal.client.thin.ClientOperation;
import org.apache.ignite.internal.client.thin.ClientProtocolError;
import org.apache.ignite.internal.client.thin.ClientRetryPolicyContextImpl;
import org.apache.ignite.internal.client.thin.IgniteClientFutureImpl;
import org.apache.ignite.internal.client.thin.PayloadInputChannel;
import org.apache.ignite.internal.client.thin.PayloadOutputChannel;
import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.thin.io.gridnioserver.GridNioClientConnectionMultiplexer;
import org.apache.ignite.internal.util.HostAndPortRange;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;

final class ReliableChannel
implements AutoCloseable {
    private static final Consumer<Integer> DO_NOTHING = v -> {};
    private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory;
    private volatile List<ClientChannelHolder> channels;
    private volatile int curChIdx = -1;
    private final boolean affinityAwarenessEnabled;
    private final ClientCacheAffinityContext affinityCtx;
    private final ClientConfiguration clientCfg;
    private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<UUID, ClientChannelHolder>();
    private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
    private volatile long startChannelsReInit;
    private volatile long finishChannelsReInit;
    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
    private volatile boolean closed;
    private final ArrayList<Runnable> chFailLsnrs = new ArrayList();
    private final ReadWriteLock curChannelsGuard = new ReentrantReadWriteLock();
    private final ClientConnectionMultiplexer connMgr;
    private volatile String[] prevHostAddrs;

    ReliableChannel(BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory, ClientConfiguration clientCfg, IgniteBinary binary) {
        if (chFactory == null) {
            throw new NullPointerException("chFactory");
        }
        if (clientCfg == null) {
            throw new NullPointerException("clientCfg");
        }
        this.clientCfg = clientCfg;
        this.chFactory = chFactory;
        this.affinityAwarenessEnabled = clientCfg.isAffinityAwarenessEnabled();
        this.affinityCtx = new ClientCacheAffinityContext(binary);
        this.connMgr = new GridNioClientConnectionMultiplexer(clientCfg);
        this.connMgr.start();
    }

    @Override
    public synchronized void close() {
        this.closed = true;
        this.connMgr.stop();
        List<ClientChannelHolder> holders = this.channels;
        if (holders != null) {
            for (ClientChannelHolder hld : holders) {
                hld.close();
            }
        }
    }

    public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, Function<PayloadInputChannel, T> payloadReader) throws ClientException, ClientError {
        return (T)this.applyOnDefaultChannel(channel -> channel.service(op, payloadWriter, payloadReader), op);
    }

    public <T> IgniteClientFuture<T> serviceAsync(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, Function<PayloadInputChannel, T> payloadReader) throws ClientException, ClientError {
        CompletableFuture fut = new CompletableFuture();
        this.handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
        return new IgniteClientFutureImpl(fut);
    }

    private <T> void handleServiceAsync(CompletableFuture<T> fut, ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, Function<PayloadInputChannel, T> payloadReader, int attemptsLimit, ClientConnectionException failure) {
        ClientChannel ch;
        int[] attemptsCnt = new int[1];
        try {
            ch = this.applyOnDefaultChannel(channel -> channel, null, attemptsLimit, v -> {
                attemptsCnt[0] = v;
            });
        }
        catch (Throwable ex) {
            if (failure != null) {
                failure.addSuppressed(ex);
                fut.completeExceptionally(failure);
                return;
            }
            fut.completeExceptionally(ex);
            return;
        }
        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) -> {
            if (err == null) {
                fut.complete(res);
                return null;
            }
            ClientConnectionException failure0 = failure;
            if (err instanceof ClientConnectionException) {
                try {
                    this.onChannelFailure(ch);
                }
                catch (Throwable ex) {
                    fut.completeExceptionally(ex);
                    return null;
                }
                if (failure0 == null) {
                    failure0 = (ClientConnectionException)err;
                } else {
                    failure0.addSuppressed((Throwable)err);
                }
                int attempt = attemptsCnt[0];
                int leftAttempts = attemptsLimit - attempt;
                if (failure == null) {
                    leftAttempts = this.getRetryLimit() - 1;
                }
                if (leftAttempts > 0 && this.shouldRetry(op, attempt, failure0)) {
                    this.handleServiceAsync(fut, op, payloadWriter, payloadReader, leftAttempts, failure0);
                    return null;
                }
            } else {
                fut.completeExceptionally(err instanceof ClientException ? err : new ClientException((Throwable)err));
                return null;
            }
            fut.completeExceptionally(failure0);
            return null;
        });
    }

    public <T> T service(ClientOperation op, Function<PayloadInputChannel, T> payloadReader) throws ClientException, ClientError {
        return this.service(op, null, payloadReader);
    }

    public <T> IgniteClientFuture<T> serviceAsync(ClientOperation op, Function<PayloadInputChannel, T> payloadReader) throws ClientException, ClientError {
        return this.serviceAsync(op, null, payloadReader);
    }

    public void request(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter) throws ClientException, ClientError {
        this.service(op, payloadWriter, null);
    }

    public IgniteClientFuture<Void> requestAsync(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter) throws ClientException, ClientError {
        return this.serviceAsync(op, payloadWriter, null);
    }

    public <T> T affinityService(int cacheId, Object key, ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, Function<PayloadInputChannel, T> payloadReader) throws ClientException, ClientError {
        UUID affNodeId;
        if (this.affinityAwarenessEnabled && this.affinityInfoIsUpToDate(cacheId) && (affNodeId = this.affinityCtx.affinityNode(cacheId, key)) != null) {
            return (T)this.applyOnNodeChannelWithFallback(affNodeId, channel -> channel.service(op, payloadWriter, payloadReader), op);
        }
        return this.service(op, payloadWriter, payloadReader);
    }

    public <T> IgniteClientFuture<T> affinityServiceAsync(int cacheId, Object key, ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, Function<PayloadInputChannel, T> payloadReader) throws ClientException, ClientError {
        CompletableFuture fut;
        CompletableFuture result;
        UUID affNodeId;
        if (this.affinityAwarenessEnabled && this.affinityInfoIsUpToDate(cacheId) && (affNodeId = this.affinityCtx.affinityNode(cacheId, key)) != null && (result = this.applyOnNodeChannel(affNodeId, arg_0 -> this.lambda$affinityServiceAsync$7(op, payloadWriter, payloadReader, fut = new CompletableFuture(), arg_0))) != null) {
            return new IgniteClientFutureImpl(fut);
        }
        return this.serviceAsync(op, payloadWriter, payloadReader);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean affinityInfoIsUpToDate(int cacheId) {
        if (this.affinityCtx.affinityUpdateRequired(cacheId)) {
            if (this.affinityUpdateInProgress.compareAndSet(false, true)) {
                try {
                    ClientCacheAffinityContext.TopologyNodes lastTop = this.affinityCtx.lastTopology();
                    if (lastTop == null) {
                        boolean bl = false;
                        return bl;
                    }
                    for (UUID nodeId : lastTop.nodes()) {
                        if (lastTop != this.affinityCtx.lastTopology()) {
                            boolean bl = false;
                            return bl;
                        }
                        Boolean result = this.applyOnNodeChannel(nodeId, channel -> channel.service(ClientOperation.CACHE_PARTITIONS, this.affinityCtx::writePartitionsUpdateRequest, this.affinityCtx::readPartitionsUpdateResponse));
                        if (result == null) continue;
                        boolean bl = result;
                        return bl;
                    }
                    this.affinityCtx.reset(lastTop);
                }
                finally {
                    this.affinityUpdateInProgress.set(false);
                }
            }
            return false;
        }
        return true;
    }

    private static List<InetSocketAddress> parsedAddresses(String[] addrs) throws ClientException {
        if (F.isEmpty(addrs)) {
            throw new ClientException("Empty addresses");
        }
        ArrayList<HostAndPortRange> ranges = new ArrayList<HostAndPortRange>(addrs.length);
        for (String a : addrs) {
            try {
                ranges.add(HostAndPortRange.parse(a, 10800, 10900, "Failed to parse Ignite server address"));
            }
            catch (IgniteCheckedException e) {
                throw new ClientException(e);
            }
        }
        return ranges.stream().flatMap(r -> IntStream.rangeClosed(r.portFrom(), r.portTo()).boxed().map(p -> InetSocketAddress.createUnresolved(r.host(), p))).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rollCurrentChannel(ClientChannelHolder hld) {
        this.curChannelsGuard.writeLock().lock();
        try {
            int idx = this.curChIdx;
            List<ClientChannelHolder> holders = this.channels;
            ClientChannelHolder dfltHld = holders.get(idx);
            if (dfltHld == hld) {
                this.curChIdx = ++idx >= holders.size() ? 0 : idx;
            }
        }
        finally {
            this.curChannelsGuard.writeLock().unlock();
        }
    }

    private void onChannelFailure(ClientChannel ch) {
        this.onChannelFailure(this.channels.get(this.curChIdx), ch);
    }

    private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
        if (ch != null && ch == hld.ch) {
            hld.closeChannel();
        }
        this.chFailLsnrs.forEach(Runnable::run);
        this.rollCurrentChannel(hld);
        if (this.scheduledChannelsReinit.get() && !this.affinityAwarenessEnabled) {
            this.channelsInit();
        }
    }

    private void initAllChannelsAsync() {
        ForkJoinPool.commonPool().submit(() -> {
            List<ClientChannelHolder> holders = this.channels;
            for (ClientChannelHolder hld : holders) {
                if (this.closed || this.startChannelsReInit > this.finishChannelsReInit) {
                    return;
                }
                try {
                    hld.getOrCreateChannel(true);
                }
                catch (Exception exception) {}
            }
        });
    }

    private void onTopologyChanged(ClientChannel ch) {
        if (this.affinityCtx.updateLastTopologyVersion(ch.serverTopologyVersion(), ch.serverNodeId()) && this.scheduledChannelsReinit.compareAndSet(false, true) && this.affinityAwarenessEnabled) {
            ForkJoinPool.commonPool().submit(this::channelsInit);
        }
    }

    public void addChannelFailListener(Runnable chFailLsnr) {
        this.chFailLsnrs.add(chFailLsnr);
    }

    private boolean shouldStopChannelsReinit() {
        return this.scheduledChannelsReinit.get() || this.closed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized boolean initChannelHolders() {
        List<ClientChannelHolder> holders = this.channels;
        this.startChannelsReInit = System.currentTimeMillis();
        this.scheduledChannelsReinit.set(false);
        List<InetSocketAddress> newAddrs = null;
        if (this.clientCfg.getAddressesFinder() != null) {
            Object[] hostAddrs = this.clientCfg.getAddressesFinder().getAddresses();
            if (hostAddrs.length == 0) {
                throw new ClientException("Empty addresses");
            }
            if (!Arrays.equals(hostAddrs, this.prevHostAddrs)) {
                newAddrs = ReliableChannel.parsedAddresses((String[])hostAddrs);
                this.prevHostAddrs = hostAddrs;
            }
        } else if (holders == null) {
            newAddrs = ReliableChannel.parsedAddresses(this.clientCfg.getAddresses());
        }
        if (newAddrs == null) {
            this.finishChannelsReInit = System.currentTimeMillis();
            return true;
        }
        HashMap<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<InetSocketAddress, ClientChannelHolder>();
        HashMap<InetSocketAddress, ClientChannelHolder> newHoldersMap = new HashMap<InetSocketAddress, ClientChannelHolder>();
        HashSet<InetSocketAddress> newAddrsSet = new HashSet<InetSocketAddress>(newAddrs);
        if (holders != null) {
            for (ClientChannelHolder h : holders) {
                if (newAddrsSet.contains(h.getAddress())) {
                    curAddrs.put(h.getAddress(), h);
                    continue;
                }
                h.close();
            }
        }
        ArrayList<ClientChannelHolder> reinitHolders = new ArrayList<ClientChannelHolder>();
        int dfltChannelIdx = -1;
        ClientChannelHolder currDfltHolder = null;
        int idx = this.curChIdx;
        if (idx != -1) {
            currDfltHolder = holders.get(idx);
        }
        for (InetSocketAddress addr : newAddrs) {
            ClientChannelHolder hld;
            if (this.shouldStopChannelsReinit()) {
                return false;
            }
            if (!curAddrs.containsKey(addr)) {
                hld = newHoldersMap.computeIfAbsent(addr, a -> new ClientChannelHolder(new ClientChannelConfiguration(this.clientCfg, (InetSocketAddress)a)));
                reinitHolders.add(hld);
                continue;
            }
            hld = (ClientChannelHolder)curAddrs.get(addr);
            reinitHolders.add(hld);
            if (hld != currDfltHolder) continue;
            dfltChannelIdx = reinitHolders.size() - 1;
        }
        if (dfltChannelIdx == -1) {
            dfltChannelIdx = 0;
        }
        this.curChannelsGuard.writeLock().lock();
        try {
            this.channels = reinitHolders;
            this.curChIdx = dfltChannelIdx;
        }
        finally {
            this.curChannelsGuard.writeLock().unlock();
        }
        this.finishChannelsReInit = System.currentTimeMillis();
        return true;
    }

    void channelsInit() {
        if (!this.initChannelHolders()) {
            return;
        }
        this.applyOnDefaultChannel(channel -> null, null);
        if (this.affinityAwarenessEnabled) {
            this.initAllChannelsAsync();
        }
    }

    private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> function) {
        ClientChannelHolder hld = null;
        ClientChannel channel = null;
        try {
            hld = this.nodeChannels.get(nodeId);
            ClientChannel clientChannel = channel = hld != null ? hld.getOrCreateChannel() : null;
            if (channel != null) {
                return function.apply(channel);
            }
        }
        catch (ClientConnectionException e) {
            this.onChannelFailure(hld, channel);
        }
        return null;
    }

    private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function, ClientOperation op) {
        return this.applyOnDefaultChannel(function, op, this.getRetryLimit(), DO_NOTHING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function, ClientOperation op, int attemptsLimit, Consumer<Integer> attemptsCallback) {
        ClientConnectionException failure = null;
        for (int attempt = 0; attempt < attemptsLimit; ++attempt) {
            ClientChannelHolder hld = null;
            ClientChannel c = null;
            try {
                if (this.closed) {
                    throw new ClientException("Channel is closed");
                }
                this.curChannelsGuard.readLock().lock();
                try {
                    hld = this.channels.get(this.curChIdx);
                }
                finally {
                    this.curChannelsGuard.readLock().unlock();
                }
                c = hld.getOrCreateChannel();
                if (c == null) continue;
                attemptsCallback.accept(attempt + 1);
                return function.apply(c);
            }
            catch (ClientConnectionException e) {
                if (failure == null) {
                    failure = e;
                } else {
                    failure.addSuppressed(e);
                }
                this.onChannelFailure(hld, c);
                if (op != null && !this.shouldRetry(op, attempt, e)) break;
            }
        }
        throw failure;
    }

    private <T> T applyOnNodeChannelWithFallback(UUID tryNodeId, Function<ClientChannel, T> function, ClientOperation op) {
        int retryLimit;
        block4: {
            ClientChannelHolder hld = this.nodeChannels.get(tryNodeId);
            retryLimit = this.getRetryLimit();
            if (hld != null) {
                ClientChannel channel = null;
                try {
                    channel = hld.getOrCreateChannel();
                    if (channel != null) {
                        return function.apply(channel);
                    }
                }
                catch (ClientConnectionException e) {
                    this.onChannelFailure(hld, channel);
                    if (--retryLimit != 0 && this.shouldRetry(op, 0, e)) break block4;
                    throw e;
                }
            }
        }
        return this.applyOnDefaultChannel(function, op, retryLimit, DO_NOTHING);
    }

    private int getRetryLimit() {
        List<ClientChannelHolder> holders = this.channels;
        if (holders == null) {
            throw new ClientException("Connections to nodes aren't initialized.");
        }
        int size = holders.size();
        return this.clientCfg.getRetryLimit() > 0 ? Math.min(this.clientCfg.getRetryLimit(), size) : size;
    }

    private boolean shouldRetry(ClientOperation op, int iteration, ClientConnectionException exception) {
        ClientOperationType opType = op.toPublicOperationType();
        if (opType == null) {
            return true;
        }
        ClientRetryPolicy plc = this.clientCfg.getRetryPolicy();
        if (plc == null) {
            return false;
        }
        ClientRetryPolicyContextImpl ctx = new ClientRetryPolicyContextImpl(this.clientCfg, opType, iteration, exception);
        return plc.shouldRetry(ctx);
    }

    List<ClientChannelHolder> getChannelHolders() {
        return this.channels;
    }

    Map<UUID, ClientChannelHolder> getNodeChannels() {
        return this.nodeChannels;
    }

    AtomicBoolean getScheduledChannelsReinit() {
        return this.scheduledChannelsReinit;
    }

    private /* synthetic */ CompletableFuture lambda$affinityServiceAsync$7(ClientOperation op, Consumer payloadWriter, Function payloadReader, CompletableFuture fut, ClientChannel channel) {
        return channel.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) -> {
            if (err == null) {
                fut.complete(res);
                return null;
            }
            try {
                this.onChannelFailure(channel);
            }
            catch (Throwable ex) {
                fut.completeExceptionally(ex);
                return null;
            }
            if (err instanceof ClientConnectionException) {
                ClientConnectionException failure = (ClientConnectionException)err;
                int attemptsLimit = this.getRetryLimit() - 1;
                if (attemptsLimit == 0 || !this.shouldRetry(op, 0, failure)) {
                    fut.completeExceptionally((Throwable)err);
                    return null;
                }
                this.handleServiceAsync(fut, op, payloadWriter, payloadReader, attemptsLimit, failure);
                return null;
            }
            fut.completeExceptionally((Throwable)err);
            return null;
        });
    }

    class ClientChannelHolder {
        private final ClientChannelConfiguration chCfg;
        private volatile ClientChannel ch;
        private volatile UUID serverNodeId;
        private volatile boolean close;
        private final long[] reconnectRetries;

        private ClientChannelHolder(ClientChannelConfiguration chCfg) {
            this.chCfg = chCfg;
            this.reconnectRetries = chCfg.getReconnectThrottlingRetries() > 0 && chCfg.getReconnectThrottlingPeriod() > 0L ? new long[chCfg.getReconnectThrottlingRetries()] : null;
        }

        private boolean applyReconnectionThrottling() {
            if (this.reconnectRetries == null) {
                return false;
            }
            long ts = System.currentTimeMillis();
            for (int i = 0; i < this.reconnectRetries.length; ++i) {
                if (ts - this.reconnectRetries[i] < this.chCfg.getReconnectThrottlingPeriod()) continue;
                this.reconnectRetries[i] = ts;
                return false;
            }
            return true;
        }

        private ClientChannel getOrCreateChannel() throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
            return this.getOrCreateChannel(false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private ClientChannel getOrCreateChannel(boolean ignoreThrottling) throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
            if (this.ch == null && !this.close) {
                ClientChannelHolder clientChannelHolder = this;
                synchronized (clientChannelHolder) {
                    if (this.close) {
                        return null;
                    }
                    if (this.ch != null) {
                        return this.ch;
                    }
                    if (!ignoreThrottling && this.applyReconnectionThrottling()) {
                        throw new ClientConnectionException("Reconnect is not allowed due to applied throttling");
                    }
                    ClientChannel channel = (ClientChannel)ReliableChannel.this.chFactory.apply(this.chCfg, ReliableChannel.this.connMgr);
                    if (channel.serverNodeId() != null) {
                        channel.addTopologyChangeListener(x$0 -> ReliableChannel.this.onTopologyChanged(x$0));
                        UUID prevId = this.serverNodeId;
                        if (prevId != null && !prevId.equals(channel.serverNodeId())) {
                            ReliableChannel.this.nodeChannels.remove(prevId, this);
                        }
                        if (!channel.serverNodeId().equals(prevId)) {
                            this.serverNodeId = channel.serverNodeId();
                            ReliableChannel.this.nodeChannels.putIfAbsent(channel.serverNodeId(), this);
                        }
                    }
                    this.ch = channel;
                }
            }
            return this.ch;
        }

        private synchronized void closeChannel() {
            if (this.ch != null) {
                U.closeQuiet(this.ch);
                this.ch = null;
            }
        }

        void close() {
            this.close = true;
            if (this.serverNodeId != null) {
                ReliableChannel.this.nodeChannels.remove(this.serverNodeId, this);
            }
            this.closeChannel();
        }

        boolean isClosed() {
            return this.close;
        }

        InetSocketAddress getAddress() {
            return this.chCfg.getAddress();
        }
    }
}

