/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.ReverseConnectionManager;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ReverseConnectionRequest;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class SourceReverseConnectionManager
implements ReverseConnectionManager {
    static final int MIN_REVERSE_NODE_ID = -1073741824;
    private static final int NUM_REVERSE_IDS = 0x1FFFFFFF;
    private final Logger log;
    private final NetworkClient networkClient;
    private final UUID linkId;
    private final Selector selector;
    private final MetadataUpdater metadataUpdater;
    private final ReverseNode.ReverseCallback reverseCallback;
    private final ReverseConnectionRequestData reversalData;
    private final Map<String, ReversalState> reverseConnectionStates;
    private final AtomicInteger lastReverseIndex;

    public SourceReverseConnectionManager(NetworkClient networkClient, Selector selector, MetadataUpdater metadataUpdater, UUID linkId, ReverseConnectionRequestData reversalData, ReverseNode.ReverseCallback reverseCallback, LogContext logContext) {
        this.log = logContext.logger(SourceReverseConnectionManager.class);
        this.linkId = linkId;
        this.networkClient = networkClient;
        this.selector = selector;
        this.metadataUpdater = metadataUpdater;
        this.reverseCallback = reverseCallback;
        this.reversalData = reversalData;
        this.reverseConnectionStates = new ConcurrentHashMap<String, ReversalState>();
        this.lastReverseIndex = new AtomicInteger(0);
    }

    @Override
    public void handleApiVersionsResponse(String nodeId, ApiVersionsResponse apiVersionsResponse) {
        ReversalState reversalState;
        if (apiVersionsResponse.data().errorCode() == Errors.NONE.code() && (reversalState = this.reverseConnectionStates.get(nodeId)) != null) {
            ReverseNode reverseNode = reversalState.reverseNode;
            if (apiVersionsResponse.apiVersion(ApiKeys.REVERSE_CONNECTION.id) == null) {
                this.log.error("Remote server does not support connection reversal, closing connection");
                throw new UnsupportedVersionException("Remote server does not support connection reversal");
            }
            ReverseConnectionRequestData reversalData = this.reversalData.duplicate().setInitiateRequestId(reverseNode.requestId().orElse(-1));
            reversalState.reverseRequestBuilder = new ReverseConnectionRequest.Builder(reversalData);
        }
    }

    @Override
    public void handleReverseConnectionResponse(String nodeId, ReverseConnectionResponse reverseConnectionResponse) {
        ReversalState reversalState = this.reverseConnectionStates.remove(nodeId);
        if (reversalState == null) {
            throw new IllegalStateException("Unexpected reverse connection response for " + nodeId);
        }
        ReverseNode reverseNode = reversalState.reverseNode;
        KafkaChannel channel = null;
        if (reverseConnectionResponse.data.errorCode() == Errors.NONE.code()) {
            try {
                channel = this.removeReverseChannel(nodeId, reverseNode);
                if (channel != null) {
                    this.reverseCallback.onReverseConnection(channel, reverseNode);
                } else if (reverseNode != null) {
                    this.log.debug("Channel was disconnected before reversal {}", (Object)reverseNode);
                    reverseNode.future().completeExceptionally(new NetworkException("Channel was disconnected"));
                }
                this.log.debug("Processed reverse channel response {}", (Object)reverseNode);
            }
            catch (Exception e) {
                this.log.error("Connection reversal failed for " + nodeId, (Throwable)e);
                if (channel != null) {
                    Utils.closeQuietly(channel, "reverse channel $channel");
                }
                this.networkClient.close(nodeId);
                if (reverseNode != null) {
                    reverseNode.future().completeExceptionally(e);
                }
            }
        } else {
            this.log.warn("Connection reversal for node {} failed with error {} : {}", new Object[]{nodeId, reverseConnectionResponse.data.errorCode(), reverseConnectionResponse.data.errorMessage()});
            this.networkClient.close(nodeId);
            reverseNode.future().completeExceptionally(Errors.forCode(reverseConnectionResponse.data.errorCode()).exception(reverseConnectionResponse.data.errorMessage()));
        }
    }

    @Override
    public void handleReverseConnectionsRequests(long now) {
        this.reverseConnectionStates.forEach((nodeId, reversal) -> {
            if (reversal.connectionPending) {
                this.log.debug("Initiate reverse connection for node {}", nodeId);
                this.networkClient.ready(reversal.reverseNode, now);
                reversal.connectionPending = false;
            }
            if (reversal.reverseRequestBuilder != null) {
                this.log.debug("Attempt to send reverse connection request for node {}", nodeId);
                if (this.networkClient.maybeSend((String)nodeId, reversal.reverseRequestBuilder, true, now)) {
                    reversal.reverseRequestBuilder = null;
                }
            }
        });
    }

    @Override
    public ReverseNode createReversibleConnection(int requestId, int remoteNodeId, ListenerName localListenerName, KafkaPrincipal localPrincipal, long now) {
        this.log.debug("Create reversible connection from source for requestId={} remoteId={} localListener={} localPrincipal={}", new Object[]{requestId, remoteNodeId, localListenerName, localPrincipal});
        Optional<Node> remoteNode = this.metadataUpdater.fetchNodes().stream().filter(node -> remoteNodeId < 0 && node.id() >= 0 || node.id() == remoteNodeId).findFirst();
        if (!remoteNode.isPresent()) {
            this.log.warn("Reverse connection to node {} could not be created since broker is not available in the metadata.", (Object)remoteNodeId);
            this.networkClient.requestClusterLinkMetadataUpdate();
            throw new NetworkException("Remote broker with id " + remoteNodeId + " not available in metadata.");
        }
        ReverseNode reverseNode = new ReverseNode(this.nextReverseNodeId(), remoteNodeId, remoteNode.get().host(), remoteNode.get().port(), this.linkId, requestId, localListenerName, localPrincipal);
        this.reverseConnectionStates.put(reverseNode.idString(), new ReversalState(reverseNode));
        this.networkClient.wakeup();
        return reverseNode;
    }

    @Override
    public void processDisconnection(String nodeId) {
        ReversalState reversal = this.reverseConnectionStates.remove(nodeId);
        if (reversal != null) {
            reversal.reverseNode.future().completeExceptionally(new NetworkException("Connection reversal aborted because connection to " + reversal.reverseNode + " was disconnected"));
        }
    }

    private KafkaChannel removeReverseChannel(String reverseNodeId, ReverseNode reverseNode) {
        KafkaChannel channel = this.selector.channel(reverseNodeId);
        if (channel != null) {
            if (reverseNode == null) {
                this.log.warn("Reverse node not found for node id {}", (Object)reverseNodeId);
                this.networkClient.close(reverseNodeId);
                return null;
            }
            this.selector.removeChannelWithoutClosing(channel);
        }
        return channel;
    }

    private int nextReverseNodeId() {
        return -1073741824 + this.lastReverseIndex.updateAndGet(i -> (i + 1) % 0x1FFFFFFF);
    }

    Collection<ReverseNode> reverseNodes() {
        return this.reverseConnectionStates.values().stream().map(state -> state.reverseNode).collect(Collectors.toList());
    }

    private static class ReversalState {
        final ReverseNode reverseNode;
        volatile boolean connectionPending;
        ReverseConnectionRequest.Builder reverseRequestBuilder;

        ReversalState(ReverseNode reverseNode) {
            this.reverseNode = reverseNode;
            this.connectionPending = true;
        }
    }
}

