package org.apache.kafka.server.link;

import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.KafkaChannelTest;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ProduceConsumeAuditLogTracker;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.PathAwareSniHostName;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.interceptor.BrokerInterceptor;
import org.apache.kafka.server.interceptor.DefaultBrokerInterceptor;
import org.apache.kafka.server.metrics.ApiSensorBuilder;
import org.apache.kafka.server.metrics.ApiSensors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/kafka/server/link/ClusterLinkSourceMetricsTest.class */
public class ClusterLinkSourceMetricsTest {
    protected Metrics metrics;
    private Time time;
    private BrokerInterceptor interceptor;

    @BeforeEach
    public void setUp() {
        this.metrics = new Metrics();
        this.time = new MockTime(0L);
    }

    @AfterEach
    public void tearDown() {
        this.metrics.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetrics(boolean z) {
        this.interceptor = createInterceptor(this.time, z);
        UUID randomUUID = UUID.randomUUID();
        Uuid uuid = new Uuid(randomUUID.getMostSignificantBits(), randomUUID.getLeastSignificantBits());
        ApiKeys apiKeys = ApiKeys.METADATA;
        short latestVersion = ApiKeys.METADATA.latestVersion();
        RequestHeaderData requestApiVersion = new RequestHeaderData().setClusterLinkId(uuid).setClientId("clientId").setCorrelationId(1).setRequestApiKey(apiKeys.id).setRequestApiVersion(latestVersion);
        RequestContext requestContext = requestContext(requestApiVersion);
        MetadataRequest metadataRequest = new MetadataRequest(new MetadataRequestData(), latestVersion);
        requestContext.parseRequest(metadataRequest.serialize());
        Double linkMetricValue = linkMetricValue("request-total", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue, "Request metrics not found");
        Assertions.assertEquals(1.0d, linkMetricValue.doubleValue(), 0.001d);
        Double linkMetricValue2 = linkMetricValue("request-byte-total", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue2, "Request bytes metrics not found");
        Assertions.assertTrue(linkMetricValue2.doubleValue() > 0.0d, "Request bytes not recorded");
        MetadataResponseData brokers = new MetadataResponseData().setClusterId("clusterId").setControllerId(0).setBrokers(new MetadataResponseData.MetadataResponseBrokerCollection());
        MetadataResponse metadataResponse = new MetadataResponse(brokers, brokers.highestSupportedVersion());
        this.time.sleep(1L);
        Assertions.assertEquals(0, requestContext.buildResponseSend(metadataResponse).getDelayedActions().size());
        Double linkMetricValue3 = linkMetricValue("response-byte-total", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue3, "Response bytes metrics not found");
        Assertions.assertTrue(linkMetricValue3.doubleValue() > 0.0d, "Response bytes not recorded");
        Double linkMetricValue4 = linkMetricValue("response-time-ns-max", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue4, "Response time metrics not found");
        Assertions.assertEquals(TimeUnit.MILLISECONDS.toNanos(1L), linkMetricValue4.doubleValue(), 0.001d);
        RequestContext requestContext2 = requestContext(new RequestHeaderData().setClientId("clientId").setCorrelationId(2).setRequestApiKey(apiKeys.id).setRequestApiVersion(latestVersion));
        requestContext2.parseRequest(metadataRequest.serialize());
        Double linkMetricValue5 = linkMetricValue("request-total", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue5, "Request metrics not found");
        Assertions.assertEquals(2.0d, linkMetricValue5.doubleValue(), 0.001d);
        Double linkMetricValue6 = linkMetricValue("request-byte-total", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue6, "Request bytes metrics not found");
        Assertions.assertTrue(linkMetricValue6.doubleValue() > linkMetricValue2.doubleValue(), "Request bytes not recorded");
        this.time.sleep(2L);
        Assertions.assertEquals(0, requestContext2.buildResponseSend(metadataResponse).getDelayedActions().size());
        Double linkMetricValue7 = linkMetricValue("response-byte-total", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue7, "Response bytes metrics not found");
        Assertions.assertTrue(linkMetricValue7.doubleValue() > linkMetricValue3.doubleValue(), "Response bytes not recorded");
        Double linkMetricValue8 = linkMetricValue("response-time-ns-max", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue8, "Response time metrics not found");
        Assertions.assertEquals(TimeUnit.MILLISECONDS.toNanos(2L), linkMetricValue8.doubleValue(), 0.001d);
        RequestContext requestContext3 = requestContext(requestApiVersion);
        MetadataResponseData.MetadataResponseTopicCollection metadataResponseTopicCollection = new MetadataResponseData.MetadataResponseTopicCollection();
        metadataResponseTopicCollection.add(new MetadataResponseData.MetadataResponseTopic().setName(responseTopic()).setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()));
        MetadataResponseData topics = brokers.setTopics(metadataResponseTopicCollection);
        MetadataResponse metadataResponse2 = new MetadataResponse(topics, topics.highestSupportedVersion());
        this.time.sleep(3L);
        Assertions.assertEquals(0, requestContext3.buildResponseSend(metadataResponse2).getDelayedActions().size());
        Double linkMetricValue9 = linkMetricValue("response-byte-total", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue9, "Response bytes metrics not found");
        Assertions.assertTrue(linkMetricValue9.doubleValue() > linkMetricValue7.doubleValue(), "Response bytes not recorded");
        Double linkMetricValue10 = linkMetricValue("response-time-ns-max", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue10, "Response time metrics not found");
        Assertions.assertEquals(TimeUnit.MILLISECONDS.toNanos(3L), linkMetricValue10.doubleValue(), 0.001d);
        Double linkMetricValue11 = linkMetricValue("error-total", uuid, apiKeys, z);
        Assertions.assertNotNull(linkMetricValue11, "Error metrics not found");
        Assertions.assertEquals(1.0d, linkMetricValue11.doubleValue(), 0.001d);
        verifyMetrics(uuid, z);
    }

    @Test
    public void testApiSensorsReinitialized() {
        ClusterLinkSourceMetrics clusterLinkSourceMetrics = new ClusterLinkSourceMetrics(this.metrics, Uuid.randomUuid(), false);
        MockTime mockTime = new MockTime();
        ApiKeys apiKeys = ApiKeys.API_VERSIONS;
        clusterLinkSourceMetrics.recordResponse(apiKeys, 100L, mockTime.nanoseconds(), Collections.emptyMap(), mockTime.milliseconds());
        Map singletonMap = Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1);
        clusterLinkSourceMetrics.recordResponse(apiKeys, 100L, mockTime.nanoseconds(), singletonMap, mockTime.milliseconds());
        ApiSensors apiSensors = clusterLinkSourceMetrics.apiSensors(apiKeys);
        Assertions.assertEquals(Collections.emptySet(), apiSensors.errorsWithoutSensors(this.metrics, singletonMap.keySet()));
        ApiSensorBuilder apiSensorBuilder = new ApiSensorBuilder(this.metrics, clusterLinkSourceMetrics.builderContext(), apiKeys);
        Map orCreateSuffixedSensors = apiSensorBuilder.getOrCreateSuffixedSensors();
        Map orCreateErrorSensors = apiSensorBuilder.getOrCreateErrorSensors(singletonMap.keySet());
        Assertions.assertFalse(apiSensors.requestSensorsExpired(this.metrics));
        orCreateErrorSensors.values().forEach(sensor -> {
            this.metrics.removeSensor(sensor.name());
        });
        Assertions.assertEquals(singletonMap.keySet(), clusterLinkSourceMetrics.apiSensors(apiKeys).errorsWithoutSensors(this.metrics, singletonMap.keySet()));
        orCreateSuffixedSensors.values().forEach(sensor2 -> {
            this.metrics.removeSensor(sensor2.name());
        });
        Assertions.assertTrue(apiSensors.requestSensorsExpired(this.metrics));
        Assertions.assertTrue(apiSensors.responseSensorsExpired(this.metrics));
        clusterLinkSourceMetrics.recordResponse(apiKeys, 100L, mockTime.nanoseconds(), Collections.emptyMap(), mockTime.milliseconds());
        ApiSensors apiSensors2 = clusterLinkSourceMetrics.apiSensors(apiKeys);
        Assertions.assertNotEquals(apiSensors, apiSensors2);
        Assertions.assertFalse(apiSensors2.requestSensorsExpired(this.metrics));
        Assertions.assertFalse(apiSensors2.responseSensorsExpired(this.metrics));
    }

    protected BrokerInterceptor createInterceptor(Time time, final boolean z) {
        HashMap<String, Object> hashMap = new HashMap<String, Object>() { // from class: org.apache.kafka.server.link.ClusterLinkSourceMetricsTest.1
            {
                put("confluent.cluster.link.enable.metrics.reduction", Boolean.valueOf(z));
            }
        };
        DefaultBrokerInterceptor defaultBrokerInterceptor = new DefaultBrokerInterceptor(time);
        defaultBrokerInterceptor.configure(hashMap);
        return defaultBrokerInterceptor;
    }

    protected KafkaPrincipal createPrincipal() {
        return new KafkaPrincipal("User", "user");
    }

    protected String responseTopic() {
        return "topic";
    }

    protected void verifyMetrics(Uuid uuid, boolean z) {
        Set<MetricName> set = (Set) this.metrics.metrics().keySet().stream().filter(metricName -> {
            return metricName.tags().containsKey("link-id");
        }).collect(Collectors.toSet());
        String expectedLinkIdTagValue = expectedLinkIdTagValue(uuid, z);
        for (MetricName metricName2 : set) {
            String str = (String) metricName2.tags().get("request");
            if (str != null) {
                Assertions.assertEquals(expectedLinkIdTagValue, metricName2.tags().get("link-id"));
                Assertions.assertEquals(ApiKeys.METADATA.name, str);
            }
            String str2 = (String) metricName2.tags().get("error");
            if (str2 != null) {
                Assertions.assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.name(), str2);
            }
        }
    }

    private RequestContext requestContext(RequestHeaderData requestHeaderData) {
        return this.interceptor.newContext(new RequestHeader(requestHeaderData, (short) 2), KafkaChannelTest.CHANNEL_ID, -1L, InetAddress.getLoopbackAddress(), createPrincipal(), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, (ClientInformation) null, this.metrics, (PathAwareSniHostName) null, false, Optional.empty(), (AuthenticationContext) null, (ProduceConsumeAuditLogTracker) null, false);
    }

    private Double linkMetricValue(String str, Uuid uuid, ApiKeys apiKeys, boolean z) {
        String expectedLinkIdTagValue = expectedLinkIdTagValue(uuid, z);
        for (Map.Entry entry : this.metrics.metrics().entrySet()) {
            MetricName metricName = (MetricName) entry.getKey();
            if ("cluster-link-source-metrics".equals(metricName.group()) && str.equals(metricName.name()) && expectedLinkIdTagValue.equals(metricName.tags().get("link-id")) && apiKeys.name.equals(metricName.tags().get("request"))) {
                return Double.valueOf(((Double) ((KafkaMetric) entry.getValue()).metricValue()).doubleValue());
            }
        }
        return null;
    }

    private static String expectedLinkIdTagValue(Uuid uuid, boolean z) {
        return z ? ClusterLinkMetricsUtils.METRIC_REDUCTION_LINK_ID_TAG_VALUE : uuid.toString();
    }
}
