/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant;

import io.confluent.cloud.protobuf.traffic.networkcontext.v1.ConnectionContext;
import io.confluent.cloud.protobuf.traffic.networkcontext.v1.NetworkContext;
import io.confluent.cloud.protobuf.traffic.networkcontext.v1.PrivateLink;
import io.confluent.cloud.protobuf.traffic.networkcontext.v1.PrivateNetworkInterface;
import io.confluent.cloud.protobuf.traffic.networkcontext.v1.Public;
import io.confluent.cloud.protobuf.traffic.networkcontext.v1.TransitGateway;
import io.confluent.cloud.protobuf.traffic.networkcontext.v1.VpcPeering;
import io.confluent.kafka.common.network.CloudProxyTlvParser;
import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.MultiTenantRequestContext;
import io.confluent.kafka.multitenant.TenantMetadata;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.AsyncAuthExecutor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelMetadataRegistry;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.MockAsyncAuthExecutor;
import org.apache.kafka.common.network.ProxyProtocol;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.network.ProxyTlv;
import org.apache.kafka.common.network.ProxyTlvProvider;
import org.apache.kafka.common.network.ProxyTlvType;
import org.apache.kafka.common.network.RequestCallback;
import org.apache.kafka.common.network.SslTransportLayer;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.DefaultRequestCallbackManager;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.interceptor.BrokerInterceptor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class MultiTenantInterceptorTest {
    private MultiTenantInterceptor interceptor;
    private Metrics metrics;
    private ChannelMetadataRegistry metadataRegistry;

    private void callOnApiVersionsRequest(MultiTenantInterceptor interceptor, String softwareName, String softwareVersion, String clientId, Metrics metrics) {
        ClientInformation clientInformation = new ClientInformation(softwareName, softwareVersion);
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("user", "saslAuthenticationId", new TenantMetadata("tenantName", "clusterId", "organizationId", "environmentId", "userResourceID", true, true, true));
        interceptor.onApiVersionsRequest(clientInformation, clientId, (KafkaPrincipal)multiTenantPrincipal, metrics);
    }

    private void callOnAuthenticatedConnection(MultiTenantInterceptor interceptor, Metrics metrics, ChannelMetadataRegistry metadataRegistry) {
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("user", "saslAuthenticationId", new TenantMetadata("tenantName", "clusterId", "organizationId", "environmentId", "userResourceID", true, true, true));
        interceptor.onAuthenticatedConnection("connectionId", InetAddress.getLoopbackAddress(), (KafkaPrincipal)multiTenantPrincipal, metrics, metadataRegistry);
    }

    private Map<String, Object> defaultConfig() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("confluent.multitenant.listener.hostname.cluster.prefix.enable", false);
        config.put("confluent.schema.validator.multitenant.enable", false);
        config.put("confluent.multitenant.interceptor.balancer.apis.enabled", false);
        config.put("confluent.multitenant.listener.hostname.subdomain.suffix.enable", ConfluentConfigs.MULTITENANT_LISTENER_HOSTNAME_SUBDOMAIN_SUFFIX_ENABLE_DEFAULT);
        config.put("confluent.valid.broker.rack.set", ConfluentConfigs.VALID_BROKER_RACK_SET_DEFAULT);
        config.put("confluent.subdomain.separator.map", ConfluentConfigs.SUBDOMAIN_SEPARATOR_MAP_DEFAULT);
        config.put("confluent.subdomain.separator.variable", "%sep");
        config.put("confluent.fetch.from.follower.require.leader.epoch.enable", false);
        config.put("confluent.broker.limit.producer.bytes.per.second", ConfluentConfigs.BROKER_LIMIT_PRODUCER_DEFAULT);
        config.put("confluent.broker.limit.consumer.bytes.per.second", ConfluentConfigs.BROKER_LIMIT_CONSUMER_DEFAULT);
        config.put("confluent.hot.partition.ratio", 0.8);
        config.put("confluent.ppv2.endpoint.scheme.enable", false);
        config.put("broker.session.uuid", "broker-session-id");
        return config;
    }

    @BeforeEach
    public void setUp() {
        this.interceptor = new MultiTenantInterceptor();
        this.metrics = new Metrics();
        this.metadataRegistry = new DefaultChannelMetadataRegistry();
    }

    @AfterEach
    public void tearDown() {
        this.metrics.close();
    }

    @Test
    public void testOnAuthenticatedConnection_DisableRecordApiVersions() {
        Map<String, Object> disabledConfig = this.defaultConfig();
        disabledConfig.put("confluent.multitenant.interceptor.collect.client.apiversions.metric", false);
        this.interceptor.configure(disabledConfig);
        this.metadataRegistry.registerClientId("should-not-be-logged");
        this.metadataRegistry.registerClientInformation(new ClientInformation("should-not-be-logged", "should-not-be-logged"));
        this.callOnAuthenticatedConnection(this.interceptor, this.metrics, this.metadataRegistry);
        Assertions.assertTrue((boolean)this.metrics.metrics().keySet().stream().noneMatch(metricName -> metricName.name().equals("connection-info-rate")), (String)"Metric shouldn't be logged when feature is gated");
    }

    @Test
    public void testOnAuthenticatedConnection_DefaultConfiguration() {
        this.interceptor.configure(this.defaultConfig());
        this.metadataRegistry.registerClientId("should-not-be-logged");
        this.metadataRegistry.registerClientInformation(new ClientInformation("should-not-be-logged", "should-not-be-logged"));
        this.callOnAuthenticatedConnection(this.interceptor, this.metrics, this.metadataRegistry);
        Assertions.assertTrue((boolean)this.metrics.metrics().keySet().stream().noneMatch(metricName -> metricName.name().equals("connection-info-rate")), (String)"Metric shouldn't be logged when the config value isn't specified (default is false)");
    }

    @Test
    public void testOnAuthenticatedConnection_EmitWhenAllInformationIsAvailable() {
        Map<String, Object> enabledConfig = this.defaultConfig();
        enabledConfig.put("confluent.multitenant.interceptor.collect.client.apiversions.metric", true);
        this.interceptor.configure(enabledConfig);
        this.metadataRegistry.registerClientInformation(new ClientInformation("some-software-name", "some-software-version"));
        this.metadataRegistry.registerClientId("api-versions-before-auth");
        this.callOnAuthenticatedConnection(this.interceptor, this.metrics, this.metadataRegistry);
        List matches = this.metrics.metrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).name().equals("connection-info-rate")).map(Map.Entry::getValue).collect(Collectors.toList());
        Assertions.assertFalse((boolean)matches.isEmpty(), (String)"Metric should be logged when config option is true");
        Assertions.assertTrue((boolean)matches.stream().anyMatch(metric -> ((String)metric.metricName().tags().get("client-id")).equals("api-versions-before-auth")), (String)"Can't find the logged client-id");
    }

    @Test
    public void testOnAuthenticatedConnection_NotEmitWhenClientInformationIsEmpty() {
        Map<String, Object> enabledConfig = this.defaultConfig();
        enabledConfig.put("confluent.multitenant.interceptor.collect.client.apiversions.metric", true);
        this.interceptor.configure(enabledConfig);
        this.metadataRegistry.registerClientInformation(ClientInformation.EMPTY);
        this.metadataRegistry.registerClientId("auth-before-api-versions");
        this.callOnAuthenticatedConnection(this.interceptor, this.metrics, this.metadataRegistry);
        List matches = this.metrics.metrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).name().equals("connection-info-rate")).map(Map.Entry::getValue).collect(Collectors.toList());
        Assertions.assertFalse((boolean)matches.stream().anyMatch(metric -> ((String)metric.metricName().tags().get("client-id")).equals("auth-before-api-versions")), (String)"Expected no connection-info-rate metric is emitted");
    }

    @Test
    public void testOnAuthenticatedConnection_NotEmitWhenClientInformationIsNull() {
        Map<String, Object> enabledConfig = this.defaultConfig();
        enabledConfig.put("confluent.multitenant.interceptor.collect.client.apiversions.metric", true);
        this.interceptor.configure(enabledConfig);
        this.metadataRegistry.registerClientId("auth-before-api-versions");
        this.callOnAuthenticatedConnection(this.interceptor, this.metrics, this.metadataRegistry);
        List matches = this.metrics.metrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).name().equals("connection-info-rate")).map(Map.Entry::getValue).collect(Collectors.toList());
        Assertions.assertFalse((boolean)matches.stream().anyMatch(metric -> ((String)metric.metricName().tags().get("client-id")).equals("auth-before-api-versions")), (String)"Expected no connection-info-rate metric is emitted");
    }

    @Test
    public void testOnAuthenticatedConnection_NotEmitWhenClientIdIsNull() {
        Map<String, Object> enabledConfig = this.defaultConfig();
        enabledConfig.put("confluent.multitenant.interceptor.collect.client.apiversions.metric", true);
        this.interceptor.configure(enabledConfig);
        this.metadataRegistry.registerClientInformation(new ClientInformation("some-software-name", "some-software-version"));
        this.callOnAuthenticatedConnection(this.interceptor, this.metrics, this.metadataRegistry);
        List matches = this.metrics.metrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).name().equals("connection-info-rate")).map(Map.Entry::getValue).collect(Collectors.toList());
        Assertions.assertTrue((boolean)matches.isEmpty(), (String)"Expected no connection-info-rate metric is emitted");
    }

    @Test
    public void testOnApiVersionsRequest_DisableRecordApiVersions() {
        Map<String, Object> disabledConfig = this.defaultConfig();
        disabledConfig.put("confluent.multitenant.interceptor.collect.client.apiversions.metric", false);
        this.interceptor.configure(disabledConfig);
        this.callOnApiVersionsRequest(this.interceptor, "should-not-be-logged", "should-not-be-logged", "should-not-be-logged", this.metrics);
        Assertions.assertTrue((boolean)this.metrics.metrics().keySet().stream().noneMatch(metricName -> metricName.name().equals("connection-info-rate")), (String)"Metric shouldn't be logged when feature is gated");
    }

    @Test
    public void testOnApiVersionsRequest_DefaultConfiguration() {
        this.interceptor.configure(this.defaultConfig());
        this.callOnApiVersionsRequest(this.interceptor, "should-not-be-logged", "should-not-be-logged", "should-not-be-logged", this.metrics);
        Assertions.assertTrue((boolean)this.metrics.metrics().keySet().stream().noneMatch(metricName -> metricName.name().equals("connection-info-rate")), (String)"Metric shouldn't be logged when the config value isn't specified (default is false)");
    }

    @Test
    public void testOnApiVersionsRequest_EnableRecordApiVersions() {
        Map<String, Object> enabledConfig = this.defaultConfig();
        enabledConfig.put("confluent.multitenant.interceptor.collect.client.apiversions.metric", true);
        this.interceptor.configure(enabledConfig);
        this.callOnApiVersionsRequest(this.interceptor, "some-software-name", "some-software-version", "foobar", this.metrics);
        List matches = this.metrics.metrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).name().equals("connection-info-rate")).map(Map.Entry::getValue).collect(Collectors.toList());
        Assertions.assertFalse((boolean)matches.isEmpty(), (String)"Metric should be logged when config option is true");
        Assertions.assertTrue((boolean)matches.stream().anyMatch(metric -> ((String)metric.metricName().tags().get("client-id")).equals("foobar")), (String)"Can't find the logged client-id");
    }

    @Test
    public void testMultipleRecordApiVersionsRecordsAtMostOnce() {
        Map<String, Object> enabledConfig = this.defaultConfig();
        enabledConfig.put("confluent.multitenant.interceptor.collect.client.apiversions.metric", true);
        this.interceptor.configure(enabledConfig);
        this.callOnApiVersionsRequest(this.interceptor, "some-software-name", "some-software-version", "foobar", this.metrics);
        this.callOnApiVersionsRequest(this.interceptor, "alternative-name", "alternative-version", "barfoo", this.metrics);
        List matches = this.metrics.metrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).name().equals("connection-info-rate")).map(Map.Entry::getValue).collect(Collectors.toList());
        Assertions.assertFalse((boolean)matches.isEmpty(), (String)"Metric should be logged when config option is true");
        Assertions.assertTrue((boolean)matches.stream().anyMatch(metric -> ((String)metric.metricName().tags().get("client-id")).equals("foobar")), (String)"Can't find the logged client-id");
    }

    @Test
    public void testDefaultNetworkType() {
        Map<String, Object> enabledConfig = this.defaultConfig();
        enabledConfig.put("confluent.emit.network.type.default", "east-west99");
        this.interceptor.configure(enabledConfig);
        String networkType = this.interceptor.networkType(new ProxyTlvProvider(){

            public ProxyTlv tlv(ProxyTlvType type) {
                return null;
            }
        });
        Assertions.assertEquals((Object)"east-west99", (Object)networkType);
    }

    @Test
    public void testDefaultNetworkTypePerListener() throws UnknownHostException {
        Map<String, Object> enableConfig = this.defaultConfig();
        enableConfig.put("broker.interceptor.class", MultiTenantInterceptor.class);
        enableConfig.put("listener.name.ssl.confluent.emit.network.type.default", "east-west99");
        enableConfig.put("listener.name.sasl_ssl.confluent.emit.network.type.default", "east-west100");
        enableConfig.put("confluent.emit.network.type.default", "east-west101");
        Assertions.assertEquals((Object)"east-west99", (Object)this.getNetworkType(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.SSL), new TestConfig(enableConfig)));
        Assertions.assertEquals((Object)"east-west100", (Object)this.getNetworkType(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.SASL_SSL), new TestConfig(enableConfig)));
        Assertions.assertEquals((Object)"east-west101", (Object)this.getNetworkType(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), new TestConfig(enableConfig)));
    }

    private String getNetworkType(ListenerName listenerName, TestConfig config) {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        try (ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)false, (SecurityProtocol)securityProtocol, (AbstractConfig)config, null, null, (Time)new MockTime(), (LogContext)new LogContext(), null, (RequestCallback)new DefaultRequestCallbackManager(), (ProxyProtocolEngineFactory)new ProxyProtocolEngineFactory(ProxyProtocol.NONE));){
            SelectionKey key = (SelectionKey)Mockito.mock(SelectionKey.class);
            Mockito.when((Object)key.isReadable()).thenReturn((Object)true);
            SocketChannel channel = (SocketChannel)Mockito.mock(SocketChannel.class);
            Mockito.mock(SelectionKey.class);
            Socket socket = (Socket)Mockito.mock(Socket.class);
            Mockito.when((Object)key.channel()).thenReturn((Object)channel);
            Mockito.when((Object)channel.socket()).thenReturn((Object)socket);
            KafkaChannel kafkaChannel = serverChannelBuilder.buildChannel("0", key, 0, (AsyncAuthExecutor)new MockAsyncAuthExecutor(), null, (ChannelMetadataRegistry)Mockito.mock(ChannelMetadataRegistry.class), null, new LogContext());
            MultiTenantInterceptor interceptor = (MultiTenantInterceptor)kafkaChannel.interceptor();
            String string = interceptor.networkType(new ProxyTlvProvider(){

                public ProxyTlv tlv(ProxyTlvType type) {
                    return null;
                }
            });
            return string;
        }
    }

    public void testNetworkType(ConnectionContext connectionContext, String expectedNetworkType) throws IOException {
        SslTransportLayer sslTransportLayer = (SslTransportLayer)Mockito.mock(SslTransportLayer.class);
        Mockito.when((Object)sslTransportLayer.sniHostName()).thenReturn(Optional.empty());
        ProxyTlvProvider proxyTlvProvider = MultiTenantInterceptorTest.getProxyTlvProvider(connectionContext);
        Mockito.when((Object)sslTransportLayer.proxyTlvProvider()).thenReturn((Object)proxyTlvProvider);
        SaslServerAuthenticator saslServerAuthenticator = (SaslServerAuthenticator)Mockito.mock(SaslServerAuthenticator.class);
        MultiTenantInterceptor brokerInterceptor = new MultiTenantInterceptor();
        brokerInterceptor.configure(this.defaultConfig());
        Metrics metrics = (Metrics)Mockito.mock(Metrics.class);
        ClientInformation clientInformation = new ClientInformation("someSoftwareName", "someSoftwareVersion");
        MultiTenantPrincipal principal = new MultiTenantPrincipal("someUser", new TenantMetadata("someTenantName", "someClusterId"));
        Mockito.when((Object)saslServerAuthenticator.principal()).thenReturn((Object)principal);
        KafkaChannel kafkaChannel = new KafkaChannel("0", (TransportLayer)sslTransportLayer, () -> saslServerAuthenticator, 0, (AsyncAuthExecutor)new MockAsyncAuthExecutor(), (MemoryPool)Mockito.mock(MemoryPool.class), (ChannelMetadataRegistry)Mockito.mock(ChannelMetadataRegistry.class), (BrokerInterceptor)brokerInterceptor, (Time)new MockTime(), true, false, new LogContext(), null, Collections.singletonList("localhost"), "", false);
        kafkaChannel.prepare();
        RequestHeader header = (RequestHeader)Mockito.mock(RequestHeader.class);
        Mockito.when((Object)header.apiKey()).thenReturn((Object)ApiKeys.PRODUCE);
        MultiTenantRequestContext context = (MultiTenantRequestContext)kafkaChannel.newRequestContext(0L, header, Optional.of(54545), new ListenerName("EXTERNAL"), SecurityProtocol.SSL, clientInformation, metrics, false);
        String networkType = context.metricsRequestContext().networkType();
        Assertions.assertEquals((Object)expectedNetworkType, (Object)networkType);
    }

    private static ProxyTlvProvider getProxyTlvProvider(ConnectionContext connectionContext) {
        final NetworkContext network = NetworkContext.newBuilder().setConnectionContext(connectionContext).build();
        return new ProxyTlvProvider(){

            public ProxyTlv tlv(ProxyTlvType type) {
                if (!CloudProxyTlvParser.NETWORK_CONTEXT_TLV_TYPE.equals((Object)type)) {
                    return null;
                }
                return new ProxyTlv(CloudProxyTlvParser.NETWORK_CONTEXT_TLV_TYPE, new byte[0], (Object)network);
            }
        };
    }

    @Test
    public void testNetworkType() throws IOException {
        this.testNetworkType(ConnectionContext.newBuilder().build(), "");
        this.testNetworkType(ConnectionContext.newBuilder().setPrivateLink(PrivateLink.newBuilder()).build(), "private-link");
        this.testNetworkType(ConnectionContext.newBuilder().setPublic(Public.newBuilder()).build(), "public");
        this.testNetworkType(ConnectionContext.newBuilder().setTransitGateway(TransitGateway.newBuilder()).build(), "transit-gateway");
        this.testNetworkType(ConnectionContext.newBuilder().setVpcPeering(VpcPeering.newBuilder()).build(), "vpc-peering");
        this.testNetworkType(ConnectionContext.newBuilder().setPrivateNetworkInterface(PrivateNetworkInterface.newBuilder()).build(), "private-network-interface");
    }

    private static class TestConfig
    extends AbstractConfig {
        private static final ConfigDef CONFIG = new ConfigDef().withClientSslSupport().define("confluent.emit.network.type.default", ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, "Default networking type to be used when the network type is not present in the request");

        public TestConfig(Map<?, ?> originals) {
            super(CONFIG, originals, false);
        }
    }
}

