/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.link;

import io.confluent.kafka.link.ClusterLinkApis;
import io.confluent.kafka.link.ClusterLinkConfig;
import io.confluent.kafka.link.LinkContext;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.metrics.TenantMetrics;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfigDefaults;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkPrincipal;
import kafka.server.link.ClusterLinkUtils;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.MessageContext;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.CorrelationIdMismatchException;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;

public class ClusterLinkInterceptor
implements ClientInterceptor,
Configurable {
    public static final String DEST_METRICS_GROUP = "cluster-link-dest-tenant-metrics";
    public static final String SOURCE_METRICS_GROUP = "cluster-link-source-tenant-metrics";
    private final TenantMetrics tenantMetrics = new TenantMetrics();
    private String metricsGroup;
    private Metrics metrics;
    LinkContext linkContext;
    private MultiTenantPrincipal principal;
    private boolean isMultiTenant;
    private boolean clusterLinkMetricReductionAdvancedEnabled;
    private Map<String, ?> configs;

    public void configure(Map<String, ?> configs) {
        this.configs = configs;
        String localTenantPrefix = (String)configs.get(ClusterLinkManager.LocalTenantPrefixProp());
        Boolean isMultiTenantFromConfigs = (Boolean)configs.get(ClusterLinkManager.IsMultiTenantProp());
        this.isMultiTenant = isMultiTenantFromConfigs == null ? false : isMultiTenantFromConfigs;
        this.clusterLinkMetricReductionAdvancedEnabled = ConfluentConfigs.clusterLinkMetricReductionAdvancedEnabled(configs);
        if (this.isMultiTenant) {
            if (localTenantPrefix == null || localTenantPrefix.isEmpty()) {
                throw new ConfigException("ClusterLinkInterceptor is multi-tenant and should be configured with a valid tenant prefix");
            }
            if (!localTenantPrefix.endsWith("_")) {
                throw new ConfigException("Tenant prefix does not contain delimiter");
            }
            String logicalCluster = localTenantPrefix.substring(0, localTenantPrefix.length() - 1);
            this.principal = new ClusterLinkPrincipal(logicalCluster);
        } else {
            localTenantPrefix = localTenantPrefix == null ? "" : localTenantPrefix;
        }
        String linkName = (String)configs.get(ClusterLinkManager.LinkNameProp());
        if (linkName == null || linkName.isEmpty()) {
            throw new ConfigException("ClusterLinkInterceptor is not configured with a valid link name");
        }
        ClusterLinkConfig.LinkMode linkMetricsMode = (ClusterLinkConfig.LinkMode)configs.get(ClusterLinkManager.LinkMetricsModeProp());
        this.metricsGroup = linkMetricsMode == ClusterLinkConfig.LinkMode.SOURCE ? SOURCE_METRICS_GROUP : DEST_METRICS_GROUP;
        String clusterLinkPrefix = (String)configs.get(ClusterLinkConfig.ClusterLinkPrefixProp());
        clusterLinkPrefix = clusterLinkPrefix == null ? "" : clusterLinkPrefix;
        Boolean prefixConsumerGroupWithClusterLinkPrefix = this.prefixConsumerGroupWithClusterLinkPrefix(configs);
        this.linkContext = new LinkContext(localTenantPrefix, clusterLinkPrefix, linkName, prefixConsumerGroupWithClusterLinkPrefix);
    }

    private Boolean prefixConsumerGroupWithClusterLinkPrefix(Map<String, ?> configs) {
        Boolean prefixConsumerGroupWithClusterLinkPrefix = (Boolean)configs.get(ClusterLinkConfig.ConsumerGroupPrefixEnableProp());
        return prefixConsumerGroupWithClusterLinkPrefix == null ? ClusterLinkConfigDefaults.ConsumerGroupPrefixEnableDefault() : prefixConsumerGroupWithClusterLinkPrefix;
    }

    public void configureMetrics(Metrics metrics) {
        this.metrics = metrics;
    }

    public void ensureConnectionAllowed(InetSocketAddress socketAddress) throws IOException {
        if (this.isMultiTenant && ClusterLinkUtils.isInternalNetworkOrPort((InetSocketAddress)socketAddress)) {
            throw new IOException(String.format("Connection to %s:%d not allowed from Confluent Cloud brokers", socketAddress.getHostString(), socketAddress.getPort()));
        }
    }

    public Send toSend(RequestHeader requestHeader, AbstractRequest requestBody, long requestSendTimeMs) {
        if (!ClusterLinkApis.isApiAllowed(requestHeader.apiKey())) {
            throw new IllegalStateException("Request " + requestHeader.apiKey() + " is not allowed on cluster links");
        }
        Send send = requestBody.toSend(requestHeader, (MessageContext)this.linkContext);
        if (this.isMultiTenant) {
            long requestSize = send.size() + 4L;
            this.tenantMetrics.recordRequest(this.metrics, this.metricsRequestContext(requestHeader), requestSize, requestSendTimeMs);
        }
        return send;
    }

    public AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader, long requestSendTimeMs, long responseReceiveTimeMs) {
        ApiKeys apiKey = requestHeader.apiKey();
        if (!ClusterLinkApis.isApiAllowed(apiKey)) {
            throw new IllegalStateException("Request " + apiKey + " is not allowed on cluster links");
        }
        if (this.isMultiTenant) {
            long latencyMs = Math.max(0L, responseReceiveTimeMs - requestSendTimeMs);
            int responseSize = 4 + responseBuffer.remaining();
            this.tenantMetrics.recordResponse(this.metrics, this.metricsRequestContext(requestHeader), responseSize, TimeUnit.MILLISECONDS.toNanos(latencyMs), Collections.emptyMap(), responseReceiveTimeMs);
        }
        short apiVersion = requestHeader.apiVersion();
        ResponseHeader responseHeader = ResponseHeader.parse((ByteBuffer)responseBuffer, (short)apiKey.responseHeaderVersion(apiVersion));
        if (requestHeader.correlationId() != responseHeader.correlationId()) {
            throw new CorrelationIdMismatchException("Correlation id for response (" + responseHeader.correlationId() + ") does not match request (" + requestHeader.correlationId() + "), request requestHeader: " + requestHeader, requestHeader.correlationId(), responseHeader.correlationId());
        }
        return AbstractResponse.parseResponse((ApiKeys)apiKey, (ByteBuffer)responseBuffer, (short)requestHeader.apiVersion(), (MessageContext)this.linkContext);
    }

    TenantMetrics.MetricsRequestContext metricsRequestContext(RequestHeader requestHeader) {
        return new ClusterLinkClientMetricsContext(this.linkContext.linkName(), this.metricsGroup, this.principal, requestHeader.clientId(), requestHeader.apiKey(), this.clusterLinkMetricReductionAdvancedEnabled);
    }

    private static class ClusterLinkClientMetricsContext
    extends TenantMetrics.MetricsRequestContext {
        private final String linkName;
        private final String metricsGroup;

        public ClusterLinkClientMetricsContext(String linkName, String metricsGroup, MultiTenantPrincipal principal, String clientId, ApiKeys apiKey, boolean clusterLinkMetricReductionAdvancedEnabled) {
            super(principal, clientId, apiKey);
            this.linkName = clusterLinkMetricReductionAdvancedEnabled ? "_confluent" : linkName;
            this.metricsGroup = metricsGroup;
        }

        @Override
        public String metricsGroup() {
            return this.metricsGroup;
        }

        @Override
        public Map<String, String> metricTags() {
            HashMap<String, String> tags = new HashMap<String, String>();
            tags.put("tenant", this.principal().tenantMetadata().tenantName);
            tags.put("link-name", this.linkName);
            return tags;
        }

        @Override
        public String sensorSuffix() {
            return String.format(":%s-%s:%s-%s", "tenant", this.principal().tenantMetadata().tenantName, "link-name", this.linkName);
        }
    }
}

