/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.integration.request.RequestUtils;
import io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import kafka.network.SocketServer;
import kafka.server.KafkaBroker;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.utils.MetricUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="integration")
public class MultiTenantDeprecatedRequestIntegrationTest
extends AbstractMultiTenantKafkaIntegrationTest {
    private static final String DEPRECATED_REQUEST_NAME = "deprecated-request";
    private static final String UNIQUE_DEPRECATED_REQUEST_METRICS_NAME = "unique-deprecated-request-metrics";
    private static final String LKC_TENANT1 = "lkc-tenant1";
    private static final String SA1 = "sa_1";

    @Override
    protected void createPhysicalAndLogicalClusters(Properties brokerProperties) {
        super.createPhysicalAndLogicalClusters(brokerProperties, new Properties(), Optional.empty(), Optional.empty());
        this.awaitMetadataPropagation();
    }

    @Override
    protected Properties nodeProps() {
        Properties properties = super.nodeProps();
        properties.put("listeners", "INTERNAL://localhost:0,EXTERNAL://localhost:0,DEPRECATED://localhost:0");
        properties.put("listener.security.protocol.map", "INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,DEPRECATED:PLAINTEXT,CONTROLLER:PLAINTEXT");
        properties.put("listener.name.deprecated.principal.builder.class", MockPrincipalBuilder.class.getName());
        properties.put("listener.name.deprecated.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        properties.put("confluent.unique.deprecated.request.metrics.per.tenant", (Object)100);
        return properties;
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testDeprecatedProduceRequestMetrics(String quorum) throws Throwable {
        List<Short> deprecatedVersions = ApiKeys.PRODUCE.allVersions().stream().filter(arg_0 -> ((ApiKeys)ApiKeys.PRODUCE).isVersionDeprecated(arg_0)).toList();
        if (deprecatedVersions.isEmpty()) {
            return;
        }
        this.setUp(1, Collections.emptyList());
        this.createPhysicalAndLogicalClusters(this.nodeProps());
        String producerClientId = "producer-0";
        String topic = "test-topic";
        String prefixedTopic1 = "lkc-tenant1_" + topic;
        this.physicalCluster.kafkaCluster().createTopic(prefixedTopic1, 1, 1);
        MemoryRecords records = this.createRecords(Time.SYSTEM.milliseconds(), (Compression)Compression.gzip().build());
        SocketServer socketServer = this.physicalCluster.kafkaCluster().kafkaBrokers().get(0).socketServer();
        try (Socket socket = RequestUtils.connect(socketServer, ListenerName.normalised((String)"DEPRECATED"));){
            int correlationId = 0;
            for (short v : deprecatedVersions) {
                ProduceRequest produceRequest = ProduceRequest.builder((ProduceRequestData)new ProduceRequestData().setAcks((short)-1).setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(new ProduceRequestData.TopicProduceData().setName(topic).setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords((BaseRecords)records)))).iterator()))).build(v);
                AbstractResponse response = RequestUtils.sendAndReceive((AbstractRequest)produceRequest, socket, producerClientId, correlationId);
                Assertions.assertEquals(ProduceResponse.class, response.getClass());
                ++correlationId;
            }
        }
        Map<String, String> deprecatedProduceTags = this.deprecatedRequestTags(ApiKeys.PRODUCE, LKC_TENANT1, SA1, producerClientId);
        Map<String, String> uniqueDeprecatedRequestMetricsTags = this.uniqueDeprecatedRequestMetricsTags(LKC_TENANT1);
        List<KafkaBroker> brokers = this.physicalCluster.kafkaCluster().kafkaBrokers();
        for (KafkaBroker broker : brokers) {
            Metrics metrics = broker.metrics();
            MetricName uniqueDeprecatedRequestMetricsName = metrics.metricName(UNIQUE_DEPRECATED_REQUEST_METRICS_NAME, "tenant-metrics", uniqueDeprecatedRequestMetricsTags);
            KafkaMetric uniqueDeprecatedRequestMetric = metrics.metric(uniqueDeprecatedRequestMetricsName);
            Assertions.assertNotNull((Object)uniqueDeprecatedRequestMetric, (String)"Expect unique deprecated request metric to exist");
            Assertions.assertEquals((double)1.0, (double)uniqueDeprecatedRequestMetric.measurableValue());
            MetricName deprecatedProduceName = MetricUtils.totalMetricName((Metrics)metrics, (String)"tenant-metrics", deprecatedProduceTags, (String)DEPRECATED_REQUEST_NAME, (String)DEPRECATED_REQUEST_NAME);
            KafkaMetric deprecatedProduceMetric = metrics.metric(deprecatedProduceName);
            Assertions.assertNotNull((Object)deprecatedProduceMetric, (String)"Expect deprecated request metric to exist");
            Assertions.assertEquals((double)deprecatedVersions.size(), (double)deprecatedProduceMetric.measurableValue());
        }
    }

    private Map<String, String> deprecatedRequestTags(ApiKeys apiKey, String tenantName, String userResourceId, String clientId) {
        LinkedHashMap<String, String> tags = new LinkedHashMap<String, String>();
        tags.put("io-confluent-jmx-ignore", "");
        tags.put("request", apiKey.name);
        tags.put("tenant", tenantName);
        tags.put("user-resource-id", userResourceId);
        tags.put("client-id", clientId);
        return tags;
    }

    private Map<String, String> uniqueDeprecatedRequestMetricsTags(String tenantName) {
        LinkedHashMap<String, String> tags = new LinkedHashMap<String, String>();
        tags.put("io-confluent-jmx-ignore", "");
        tags.put("tenant", tenantName);
        return tags;
    }

    private MemoryRecords createRecords(long timestamp, Compression codec) {
        ByteBuffer buf = ByteBuffer.allocate(512);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buf, (byte)2, (Compression)codec, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.appendWithOffset(0L, timestamp, null, "hello".getBytes());
        builder.appendWithOffset(1L, timestamp, null, "there".getBytes());
        builder.appendWithOffset(2L, timestamp, null, "beautiful".getBytes());
        return builder.build();
    }

    public static class MockPrincipalBuilder
    extends MultiTenantPrincipalBuilder {
        public KafkaPrincipal build(AuthenticationContext context) {
            TenantMetadata tenantMetadata = new TenantMetadata.Builder(MultiTenantDeprecatedRequestIntegrationTest.LKC_TENANT1, MultiTenantDeprecatedRequestIntegrationTest.SA1).build();
            return new MultiTenantPrincipal("user1", tenantMetadata);
        }
    }
}

