/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import io.confluent.kafka.link.ClusterLinkInterceptor;
import io.confluent.kafka.multitenant.metrics.TenantMetrics;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import kafka.server.link.ClusterLinkManager;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class ClusterLinkInterceptorTest {
    private final String localTenantPrefix = "dest_tenant_";
    private final String linkName = "dest_tenant_link";

    private Map<String, Object> buildConfigs(boolean isMultiTenant, boolean metricReductionEnabled) {
        return Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ClusterLinkManager.IsMultiTenantProp(), (Object)isMultiTenant), Utils.mkEntry((Object)ClusterLinkManager.LocalTenantPrefixProp(), (Object)"dest_tenant_"), Utils.mkEntry((Object)ClusterLinkManager.LinkNameProp(), (Object)"dest_tenant_link"), Utils.mkEntry((Object)"confluent.cluster.link.enable.metrics.reduction.advanced", (Object)metricReductionEnabled)});
    }

    @Test
    public void testConfigure() {
        ClusterLinkInterceptor interceptor = new ClusterLinkInterceptor();
        Assertions.assertThrows(ConfigException.class, () -> interceptor.configure(Collections.emptyMap()));
        interceptor.configure(this.buildConfigs(false, false));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testConfigure(boolean metricReductionEnabled) {
        ClusterLinkInterceptor interceptor = new ClusterLinkInterceptor();
        interceptor.configure(this.buildConfigs(true, metricReductionEnabled));
        ApiKeys apiKey = ApiKeys.METADATA;
        RequestHeaderData headerData = new RequestHeaderData().setClientId("clientId").setCorrelationId(1).setRequestApiKey(apiKey.id).setRequestApiVersion(apiKey.latestVersion());
        TenantMetrics.MetricsRequestContext metricsRequestContext = interceptor.metricsRequestContext(new RequestHeader(headerData, 2));
        Assertions.assertEquals((Object)(metricReductionEnabled ? "_confluent" : "dest_tenant_link"), metricsRequestContext.metricTags().get("link-name"));
    }

    @Test
    public void testServiceLoader() {
        ClientInterceptor interceptor = ClusterLinkManager.prefixInterceptor((String)"dest_tenant_", (String)"", (String)"dest_tenant_link", (ClusterLinkConfig.LinkMode)ClusterLinkConfig.LinkMode.DESTINATION, null, (boolean)true, (boolean)true, (boolean)false, null);
        Assertions.assertNotNull((Object)interceptor);
        Assertions.assertTrue((boolean)(interceptor instanceof ClusterLinkInterceptor));
    }

    @Test
    public void testNonServiceLoader() {
        ClientInterceptor interceptor = ClusterLinkManager.prefixInterceptor((String)"dest_tenant_", (String)"", (String)"dest_tenant_link", (ClusterLinkConfig.LinkMode)ClusterLinkConfig.LinkMode.DESTINATION, null, (boolean)false, (boolean)true, (boolean)false, null);
        Assertions.assertNotNull((Object)interceptor);
        Assertions.assertTrue((boolean)(interceptor instanceof ClusterLinkInterceptor));
    }

    @Test
    public void testEnsureConnectionAllowedInternalIp() throws IOException {
        ClientInterceptor interceptor = ClusterLinkManager.prefixInterceptor((String)"dest_tenant_", (String)"", (String)"dest_tenant_link", (ClusterLinkConfig.LinkMode)ClusterLinkConfig.LinkMode.DESTINATION, null, (boolean)true, (boolean)true, (boolean)false, null);
        Assertions.assertNotNull((Object)interceptor);
        interceptor.ensureConnectionAllowed(new InetSocketAddress(InetAddress.getByAddress(new byte[]{11, 0, 0, 1}), 9071));
        interceptor.ensureConnectionAllowed(new InetSocketAddress(InetAddress.getByAddress(new byte[]{127, 0, 0, 1}), 9092));
        Assertions.assertThrows(IOException.class, () -> interceptor.ensureConnectionAllowed(new InetSocketAddress(InetAddress.getByAddress(new byte[]{127, 0, 0, 1}), 9071)));
        Assertions.assertThrows(IOException.class, () -> interceptor.ensureConnectionAllowed(new InetSocketAddress(InetAddress.getByAddress("localhost", new byte[]{127, 0, 0, 1}), 9071)));
        Assertions.assertThrows(IOException.class, () -> interceptor.ensureConnectionAllowed(new InetSocketAddress(InetAddress.getByAddress(new byte[]{10, 0, 0, 1}), 9071)));
    }

    @Test
    void testTrackApiKeysMaxVersionNotMultiTenant() {
        Metrics metrics = new Metrics();
        ClusterLinkInterceptor interceptor = new ClusterLinkInterceptor();
        interceptor.configure(this.buildConfigs(false, false));
        interceptor.configureMetrics(metrics);
        ApiVersionsResponse response = TestUtils.confluentCloudApiVersionsResponse((ApiMessageType.ListenerType)ApiMessageType.ListenerType.BROKER);
        interceptor.trackApiKeysMaxVersion(ApiKeys.API_VERSIONS, (AbstractResponse)response);
        KafkaMetric metric = ClusterLinkInterceptorTest.getFetchMaxVersionMetric(metrics);
        Assertions.assertNull((Object)metric, (String)"Expected Fetch max version metric to be null");
    }

    private static KafkaMetric getFetchMaxVersionMetric(Metrics metrics) {
        TenantMetrics.ApiKeysVersionContext context = new TenantMetrics.ApiKeysVersionContext(ApiKeys.FETCH);
        MetricName fetchApiKeyMaxVersionName = metrics.metricName("api-keys-max-version-min", context.metricsGroup(), "", context.metricTags());
        return (KafkaMetric)metrics.metrics().get(fetchApiKeyMaxVersionName);
    }

    @Test
    void testTrackApiKeysMaxVersion() {
        Metrics metrics = new Metrics();
        ClusterLinkInterceptor interceptor = new ClusterLinkInterceptor();
        interceptor.configure(this.buildConfigs(true, false));
        interceptor.configureMetrics(metrics);
        ApiVersionsResponse response = TestUtils.confluentCloudApiVersionsResponse((ApiMessageType.ListenerType)ApiMessageType.ListenerType.BROKER);
        interceptor.trackApiKeysMaxVersion(ApiKeys.API_VERSIONS, (AbstractResponse)response);
        KafkaMetric metric = ClusterLinkInterceptorTest.getFetchMaxVersionMetric(metrics);
        Assertions.assertNotNull((Object)metric, (String)"Expected Fetch max version metric to be registered");
        ApiVersionsResponseData.ApiVersion fetchApiVersion = response.apiVersion(ApiKeys.FETCH.id);
        short maxVersion = fetchApiVersion.maxVersion();
        Assertions.assertEquals((int)maxVersion, (int)((Double)metric.metricValue()).intValue());
        int newMaxVersion = maxVersion + 1;
        fetchApiVersion.setMaxVersion((short)newMaxVersion);
        interceptor.trackApiKeysMaxVersion(ApiKeys.API_VERSIONS, (AbstractResponse)response);
        Assertions.assertEquals((int)maxVersion, (int)((Double)metric.metricValue()).intValue());
        newMaxVersion = maxVersion - 1;
        fetchApiVersion.setMaxVersion((short)newMaxVersion);
        interceptor.trackApiKeysMaxVersion(ApiKeys.API_VERSIONS, (AbstractResponse)response);
        Assertions.assertEquals((int)newMaxVersion, (int)((Double)metric.metricValue()).intValue());
        metrics.close();
    }
}

