package org.apache.kafka.clients;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ReverseConnectionRequest;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/SourceReverseConnectionManager.class */
public class SourceReverseConnectionManager implements ReverseConnectionManager {
    static final int MIN_REVERSE_NODE_ID = -1073741824;
    private static final int NUM_REVERSE_IDS = 536870911;
    private final Logger log;
    private final NetworkClient networkClient;
    private final UUID linkId;
    private final Selector selector;
    private final MetadataUpdater metadataUpdater;
    private final ReverseNode.ReverseCallback reverseCallback;
    private final ReverseConnectionRequestData reversalData;
    private final Map<String, ReversalState> reverseConnectionStates = new ConcurrentHashMap();
    private final AtomicInteger lastReverseIndex = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/SourceReverseConnectionManager$ReversalState.class */
    public static class ReversalState {
        final ReverseNode reverseNode;
        volatile boolean connectionPending = true;
        ReverseConnectionRequest.Builder reverseRequestBuilder;

        ReversalState(ReverseNode reverseNode) {
            this.reverseNode = reverseNode;
        }
    }

    public SourceReverseConnectionManager(NetworkClient networkClient, Selector selector, MetadataUpdater metadataUpdater, UUID uuid, ReverseConnectionRequestData reverseConnectionRequestData, ReverseNode.ReverseCallback reverseCallback, LogContext logContext) {
        this.log = logContext.logger(SourceReverseConnectionManager.class);
        this.linkId = uuid;
        this.networkClient = networkClient;
        this.selector = selector;
        this.metadataUpdater = metadataUpdater;
        this.reverseCallback = reverseCallback;
        this.reversalData = reverseConnectionRequestData;
    }

    @Override // org.apache.kafka.clients.ReverseConnectionManager
    public void handleApiVersionsResponse(String str, ApiVersionsResponse apiVersionsResponse) {
        ReversalState reversalState;
        if (apiVersionsResponse.data().errorCode() != Errors.NONE.code() || (reversalState = this.reverseConnectionStates.get(str)) == null) {
            return;
        }
        ReverseNode reverseNode = reversalState.reverseNode;
        if (apiVersionsResponse.apiVersion(ApiKeys.REVERSE_CONNECTION.id) == null) {
            this.log.error("Remote server does not support connection reversal, closing connection");
            throw new UnsupportedVersionException("Remote server does not support connection reversal");
        }
        reversalState.reverseRequestBuilder = new ReverseConnectionRequest.Builder(this.reversalData.duplicate().setInitiateRequestId(reverseNode.requestId().orElse(-1).intValue()));
    }

    @Override // org.apache.kafka.clients.ReverseConnectionManager
    public void handleReverseConnectionResponse(String str, ReverseConnectionResponse reverseConnectionResponse) {
        ReversalState remove = this.reverseConnectionStates.remove(str);
        if (remove == null) {
            throw new IllegalStateException("Unexpected reverse connection response for " + str);
        }
        ReverseNode reverseNode = remove.reverseNode;
        KafkaChannel kafkaChannel = null;
        if (reverseConnectionResponse.data.errorCode() != Errors.NONE.code()) {
            this.log.warn("Connection reversal for node {} failed with error {} : {}", str, Short.valueOf(reverseConnectionResponse.data.errorCode()), reverseConnectionResponse.data.errorMessage());
            this.networkClient.close(str);
            reverseNode.future().completeExceptionally(Errors.forCode(reverseConnectionResponse.data.errorCode()).exception(reverseConnectionResponse.data.errorMessage()));
            return;
        }
        try {
            kafkaChannel = removeReverseChannel(str, reverseNode);
            if (kafkaChannel != null) {
                this.reverseCallback.onReverseConnection(kafkaChannel, reverseNode);
            } else if (reverseNode != null) {
                this.log.debug("Channel was disconnected before reversal {}", reverseNode);
                reverseNode.future().completeExceptionally(new NetworkException("Channel was disconnected"));
            }
            this.log.debug("Processed reverse channel response {}", reverseNode);
        } catch (Exception e) {
            this.log.error("Connection reversal failed for " + str, (Throwable) e);
            if (kafkaChannel != null) {
                Utils.closeQuietly(kafkaChannel, "reverse channel $channel");
            }
            this.networkClient.close(str);
            if (reverseNode != null) {
                reverseNode.future().completeExceptionally(e);
            }
        }
    }

    @Override // org.apache.kafka.clients.ReverseConnectionManager
    public void handleReverseConnectionsRequests(long j) {
        this.reverseConnectionStates.forEach((str, reversalState) -> {
            if (reversalState.connectionPending) {
                this.log.debug("Initiate reverse connection for node {}", str);
                this.networkClient.ready(reversalState.reverseNode, j);
                reversalState.connectionPending = false;
            }
            if (reversalState.reverseRequestBuilder != null) {
                this.log.debug("Attempt to send reverse connection request for node {}", str);
                if (this.networkClient.maybeSend(str, reversalState.reverseRequestBuilder, true, j)) {
                    reversalState.reverseRequestBuilder = null;
                }
            }
        });
    }

    @Override // org.apache.kafka.clients.ReverseConnectionManager
    public ReverseNode createReversibleConnection(int i, int i2, ListenerName listenerName, KafkaPrincipal kafkaPrincipal, long j) {
        this.log.debug("Create reversible connection from source for requestId={} remoteId={} localListener={} localPrincipal={}", Integer.valueOf(i), Integer.valueOf(i2), listenerName, kafkaPrincipal);
        Optional<Node> findFirst = this.metadataUpdater.fetchNodes().stream().filter(node -> {
            return (i2 < 0 && node.id() >= 0) || node.id() == i2;
        }).findFirst();
        if (!findFirst.isPresent()) {
            this.log.warn("Reverse connection to node {} could not be created since broker is not available in the metadata.", Integer.valueOf(i2));
            this.networkClient.requestClusterLinkMetadataUpdate();
            throw new NetworkException("Remote broker with id " + i2 + " not available in metadata.");
        }
        ReverseNode reverseNode = new ReverseNode(nextReverseNodeId(), i2, findFirst.get().host(), findFirst.get().port(), this.linkId, i, listenerName, kafkaPrincipal);
        this.reverseConnectionStates.put(reverseNode.idString(), new ReversalState(reverseNode));
        this.networkClient.wakeup();
        return reverseNode;
    }

    @Override // org.apache.kafka.clients.ReverseConnectionManager
    public boolean processDisconnection(String str) {
        ReversalState remove = this.reverseConnectionStates.remove(str);
        if (remove == null) {
            return false;
        }
        remove.reverseNode.future().completeExceptionally(new NetworkException("Connection reversal aborted because connection to " + remove.reverseNode + " was disconnected"));
        return true;
    }

    private KafkaChannel removeReverseChannel(String str, ReverseNode reverseNode) {
        KafkaChannel channel = this.selector.channel(str);
        if (channel != null) {
            if (reverseNode == null) {
                this.log.warn("Reverse node not found for node id {}", str);
                this.networkClient.close(str);
                return null;
            }
            this.selector.removeChannelWithoutClosing(channel);
            this.networkClient.removeNode(str);
        }
        return channel;
    }

    private int nextReverseNodeId() {
        return MIN_REVERSE_NODE_ID + this.lastReverseIndex.updateAndGet(i -> {
            return (i + 1) % 536870911;
        });
    }

    Collection<ReverseNode> reverseNodes() {
        return (Collection) this.reverseConnectionStates.values().stream().map(reversalState -> {
            return reversalState.reverseNode;
        }).collect(Collectors.toList());
    }
}
