/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.link;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestAndSize;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.PathAwareSniHostName;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.link.ClusterLinkSourceMetrics;
import org.apache.kafka.server.metrics.ApiSensors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterLinkRequestContext
extends RequestContext {
    public static final Logger log = LoggerFactory.getLogger(ClusterLinkRequestContext.class);
    private final ClusterLinkSourceMetrics linkMetrics;
    private final Time time;
    private final long startNanos;
    private final Boolean allowAllRequestTypes;
    private Boolean requestAllowed;

    public ClusterLinkRequestContext(RequestHeader header, String connectionId, InetAddress clientAddress, KafkaPrincipal principal, ListenerName listenerName, SecurityProtocol securityProtocol, ClientInformation clientInformation, PathAwareSniHostName sniHostName, Time time, ClusterLinkSourceMetrics linkMetrics, boolean fromPrivilegedListener, Optional<KafkaPrincipalSerde> principalSerde, AuthenticationContext authenticationContext, boolean allowAllRequestTypes) {
        super(header, connectionId, clientAddress, principal, listenerName, securityProtocol, clientInformation, sniHostName, fromPrivilegedListener, principalSerde, authenticationContext);
        this.time = time;
        this.startNanos = time.nanoseconds();
        this.linkMetrics = linkMetrics;
        this.allowAllRequestTypes = allowAllRequestTypes;
        this.requestAllowed = true;
    }

    @Override
    public RequestAndSize parseRequest(ByteBuffer buffer) {
        long totalSize = ApiSensors.calculateRequestSize(this.header, buffer);
        this.linkMetrics.recordRequest(this.header.apiKey(), totalSize, this.time.milliseconds());
        RequestAndSize requestAndSize = super.parseRequest(buffer);
        if (this.allowAllRequestTypes.booleanValue()) {
            return requestAndSize;
        }
        log.warn("Cluster linking request {} attempted on listener {} with security protocol {}", new Object[]{this.header.apiKey(), this.listenerName, this.securityProtocol});
        AbstractRequest body = requestAndSize.request;
        if (body instanceof MetadataRequest) {
            MetadataRequest metadataRequest = (MetadataRequest)body;
            List<String> topics = metadataRequest.topics();
            if (topics == null || !topics.isEmpty()) {
                this.requestAllowed = false;
            }
        } else if (!(body instanceof ApiVersionsRequest)) {
            this.requestAllowed = false;
        }
        return requestAndSize;
    }

    @Override
    public Send buildResponseSend(AbstractResponse body) {
        Send send = super.buildResponseSend(body);
        this.linkMetrics.recordResponse(this.header.apiKey(), send.size(), this.time.nanoseconds() - this.startNanos, body.errorCounts(), this.time.milliseconds());
        return send;
    }

    @Override
    public boolean shouldIntercept() {
        return this.requestAllowed == false;
    }

    @Override
    public AbstractResponse intercept(AbstractRequest request, int throttleTimeMs) {
        return request.getErrorResponse(throttleTimeMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
    }
}

