/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.link;

import java.net.InetAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.interceptor.BrokerInterceptor;
import org.apache.kafka.server.interceptor.DefaultBrokerInterceptor;
import org.apache.kafka.server.link.ClusterLinkSourceMetrics;
import org.apache.kafka.server.metrics.ApiSensorBuilder;
import org.apache.kafka.server.metrics.ApiSensors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ClusterLinkSourceMetricsTest {
    protected Metrics metrics;
    private Time time;
    private BrokerInterceptor interceptor;

    @BeforeEach
    public void setUp() {
        this.metrics = new Metrics();
        this.time = new MockTime(0L);
        this.interceptor = this.createInterceptor(this.time);
    }

    @AfterEach
    public void tearDown() {
        this.metrics.close();
    }

    @Test
    public void testMetrics() {
        UUID linkId = UUID.randomUUID();
        Uuid kafkaLinkUUID = new Uuid(linkId.getMostSignificantBits(), linkId.getLeastSignificantBits());
        ApiKeys apiKey = ApiKeys.METADATA;
        short requestVersion = ApiKeys.METADATA.latestVersion();
        RequestHeaderData headerData = new RequestHeaderData().setClusterLinkId(kafkaLinkUUID).setClientId("clientId").setCorrelationId(1).setRequestApiKey(apiKey.id).setRequestApiVersion(requestVersion);
        RequestContext context = this.requestContext(headerData);
        MetadataRequest request = new MetadataRequest(new MetadataRequestData(), requestVersion);
        context.parseRequest(request.serialize());
        Double requestTotal = this.linkMetricValue("request-total", linkId, apiKey);
        Assertions.assertNotNull((Object)requestTotal, (String)"Request metrics not found");
        Assertions.assertEquals((double)1.0, (double)requestTotal, (double)0.001);
        Double requestByteTotal = this.linkMetricValue("request-byte-total", linkId, apiKey);
        Assertions.assertNotNull((Object)requestByteTotal, (String)"Request bytes metrics not found");
        Assertions.assertTrue((requestByteTotal > 0.0 ? 1 : 0) != 0, (String)"Request bytes not recorded");
        MetadataResponseData responseData = new MetadataResponseData().setClusterId("clusterId").setControllerId(0).setBrokers(new MetadataResponseData.MetadataResponseBrokerCollection());
        MetadataResponse response = new MetadataResponse(responseData, responseData.highestSupportedVersion());
        this.time.sleep(1L);
        context.buildResponseSend((AbstractResponse)response);
        Double responseByteTotal = this.linkMetricValue("response-byte-total", linkId, apiKey);
        Assertions.assertNotNull((Object)responseByteTotal, (String)"Response bytes metrics not found");
        Assertions.assertTrue((responseByteTotal > 0.0 ? 1 : 0) != 0, (String)"Response bytes not recorded");
        Double responseTimeMax = this.linkMetricValue("response-time-ns-max", linkId, apiKey);
        Assertions.assertNotNull((Object)responseTimeMax, (String)"Response time metrics not found");
        Assertions.assertEquals((double)TimeUnit.MILLISECONDS.toNanos(1L), (double)responseTimeMax, (double)0.001);
        RequestHeaderData headerData2 = new RequestHeaderData().setClientId("clientId").setCorrelationId(2).setRequestApiKey(apiKey.id).setRequestApiVersion(requestVersion);
        RequestContext context2 = this.requestContext(headerData2);
        context2.parseRequest(request.serialize());
        Double requestTotal2 = this.linkMetricValue("request-total", linkId, apiKey);
        Assertions.assertNotNull((Object)requestTotal2, (String)"Request metrics not found");
        Assertions.assertEquals((double)2.0, (double)requestTotal2, (double)0.001);
        Double requestByteTotal2 = this.linkMetricValue("request-byte-total", linkId, apiKey);
        Assertions.assertNotNull((Object)requestByteTotal2, (String)"Request bytes metrics not found");
        Assertions.assertTrue((requestByteTotal2 > requestByteTotal ? 1 : 0) != 0, (String)"Request bytes not recorded");
        this.time.sleep(2L);
        context2.buildResponseSend((AbstractResponse)response);
        Double responseByteTotal2 = this.linkMetricValue("response-byte-total", linkId, apiKey);
        Assertions.assertNotNull((Object)responseByteTotal2, (String)"Response bytes metrics not found");
        Assertions.assertTrue((responseByteTotal2 > responseByteTotal ? 1 : 0) != 0, (String)"Response bytes not recorded");
        Double responseTimeMax2 = this.linkMetricValue("response-time-ns-max", linkId, apiKey);
        Assertions.assertNotNull((Object)responseTimeMax2, (String)"Response time metrics not found");
        Assertions.assertEquals((double)TimeUnit.MILLISECONDS.toNanos(2L), (double)responseTimeMax2, (double)0.001);
        RequestContext context3 = this.requestContext(headerData);
        MetadataResponseData.MetadataResponseTopicCollection topics = new MetadataResponseData.MetadataResponseTopicCollection();
        topics.add(new MetadataResponseData.MetadataResponseTopic().setName(this.responseTopic()).setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()));
        MetadataResponseData errorResponseData = responseData.setTopics(topics);
        MetadataResponse errorResponse = new MetadataResponse(errorResponseData, errorResponseData.highestSupportedVersion());
        this.time.sleep(3L);
        context3.buildResponseSend((AbstractResponse)errorResponse);
        Double responseByteTotal3 = this.linkMetricValue("response-byte-total", linkId, apiKey);
        Assertions.assertNotNull((Object)responseByteTotal3, (String)"Response bytes metrics not found");
        Assertions.assertTrue((responseByteTotal3 > responseByteTotal2 ? 1 : 0) != 0, (String)"Response bytes not recorded");
        Double responseTimeMax3 = this.linkMetricValue("response-time-ns-max", linkId, apiKey);
        Assertions.assertNotNull((Object)responseTimeMax3, (String)"Response time metrics not found");
        Assertions.assertEquals((double)TimeUnit.MILLISECONDS.toNanos(3L), (double)responseTimeMax3, (double)0.001);
        Double errorTotal = this.linkMetricValue("error-total", linkId, apiKey);
        Assertions.assertNotNull((Object)errorTotal, (String)"Error metrics not found");
        Assertions.assertEquals((double)1.0, (double)errorTotal, (double)0.001);
        this.verifyMetrics();
    }

    @Test
    public void testApiSensorsReinitialized() {
        UUID linkId = UUID.randomUUID();
        ClusterLinkSourceMetrics linkMetrics = new ClusterLinkSourceMetrics(this.metrics, linkId);
        MockTime time = new MockTime();
        ApiKeys apiKey = ApiKeys.API_VERSIONS;
        linkMetrics.recordResponse(apiKey, 100L, time.nanoseconds(), Collections.emptyMap(), time.milliseconds());
        Map<Errors, Integer> errors = Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1);
        linkMetrics.recordResponse(apiKey, 100L, time.nanoseconds(), errors, time.milliseconds());
        ApiSensors apiSensors = linkMetrics.apiSensors(apiKey);
        Assertions.assertEquals(Collections.emptySet(), (Object)apiSensors.errorsWithoutSensors(this.metrics, errors.keySet()));
        ApiSensorBuilder sensorBuilder = new ApiSensorBuilder(this.metrics, linkMetrics.builderContext(), apiKey);
        Map responseSensors = sensorBuilder.getOrCreateSuffixedSensors();
        Map errorSensors = sensorBuilder.getOrCreateErrorSensors(errors.keySet());
        Assertions.assertFalse((boolean)apiSensors.requestSensorsExpired(this.metrics));
        errorSensors.values().forEach(sensor -> this.metrics.removeSensor(sensor.name()));
        Assertions.assertEquals(errors.keySet(), (Object)linkMetrics.apiSensors(apiKey).errorsWithoutSensors(this.metrics, errors.keySet()));
        responseSensors.values().forEach(sensor -> this.metrics.removeSensor(sensor.name()));
        Assertions.assertTrue((boolean)apiSensors.requestSensorsExpired(this.metrics));
        Assertions.assertTrue((boolean)apiSensors.responseSensorsExpired(this.metrics));
        linkMetrics.recordResponse(apiKey, 100L, time.nanoseconds(), Collections.emptyMap(), time.milliseconds());
        ApiSensors newSensors = linkMetrics.apiSensors(apiKey);
        Assertions.assertNotEquals((Object)apiSensors, (Object)newSensors);
        Assertions.assertFalse((boolean)newSensors.requestSensorsExpired(this.metrics));
        Assertions.assertFalse((boolean)newSensors.responseSensorsExpired(this.metrics));
    }

    protected BrokerInterceptor createInterceptor(Time time) {
        return new DefaultBrokerInterceptor(time);
    }

    protected KafkaPrincipal createPrincipal() {
        return new KafkaPrincipal("User", "user");
    }

    protected String responseTopic() {
        return "topic";
    }

    protected void verifyMetrics() {
        Set metricNames = this.metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("link-id")).collect(Collectors.toSet());
        for (MetricName metricName : metricNames) {
            String error;
            String requestType = (String)metricName.tags().get("request");
            if (requestType != null) {
                Assertions.assertEquals((Object)ApiKeys.METADATA.name, (Object)requestType);
            }
            if ((error = (String)metricName.tags().get("error")) == null) continue;
            Assertions.assertEquals((Object)Errors.TOPIC_AUTHORIZATION_FAILED.name(), (Object)error);
        }
    }

    private RequestContext requestContext(RequestHeaderData headerData) {
        return this.interceptor.newContext(new RequestHeader(headerData, 2), "0", InetAddress.getLoopbackAddress(), this.createPrincipal(), ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, null, this.metrics, null, false, Optional.empty(), null);
    }

    private Double linkMetricValue(String name, UUID linkId, ApiKeys apiKey) {
        for (Map.Entry entry : this.metrics.metrics().entrySet()) {
            MetricName metricName = (MetricName)entry.getKey();
            if (!"cluster-link-source-metrics".equals(metricName.group()) || !name.equals(metricName.name()) || !Utils.toKafkaUuid((UUID)linkId).toString().equals(metricName.tags().get("link-id")) || !apiKey.name.equals(metricName.tags().get("request"))) continue;
            return (double)((Double)((KafkaMetric)entry.getValue()).metricValue());
        }
        return null;
    }
}

