package io.confluent.kafka.multitenant.quota;

import io.confluent.kafka.multitenant.quota.TenantClientQuotaConsumer;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.protobuf.cloud.events.v1.ClientQuotaKey;
import io.confluent.protobuf.cloud.events.v1.ClientQuotaValue;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import kafka.server.KafkaConfig;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/kafka/multitenant/quota/TenantClientQuotaConsumerTest.class */
public class TenantClientQuotaConsumerTest {
    private static final String TOPIC = "_confluent-client_quota";
    private static final String SESSION_UUID = "session-uuid";
    private MockedStatic<TenantQuotaCallback> tenantQuotaCallback;

    @BeforeEach
    public void setUp() throws Exception {
        this.tenantQuotaCallback = Mockito.mockStatic(TenantQuotaCallback.class, Answers.RETURNS_SMART_NULLS);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.tenantQuotaCallback.close();
    }

    @Test
    public void testDisabledInConfig() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put(KafkaConfig.BrokerSessionUuidProp(), SESSION_UUID);
        hashMap.put("confluent.cdc.client.quotas.enable", new Boolean(false));
        TenantClientQuotaConsumer tenantClientQuotaConsumer = new TenantClientQuotaConsumer(hashMap2, new Metrics(), new MockTime());
        Assertions.assertEquals(TenantClientQuotaConsumer.State.NOT_STARTED, tenantClientQuotaConsumer.state.get());
        tenantClientQuotaConsumer.configure(hashMap);
        Assertions.assertEquals(TenantClientQuotaConsumer.State.NOT_ENABLED, tenantClientQuotaConsumer.state.get());
        tenantClientQuotaConsumer.close();
        Assertions.assertEquals(TenantClientQuotaConsumer.State.NOT_ENABLED, tenantClientQuotaConsumer.state.get());
        hashMap.put("confluent.cdc.client.quotas.enable", null);
        TenantClientQuotaConsumer tenantClientQuotaConsumer2 = new TenantClientQuotaConsumer(hashMap2, new Metrics(), new MockTime());
        Assertions.assertEquals(TenantClientQuotaConsumer.State.NOT_STARTED, tenantClientQuotaConsumer2.state.get());
        tenantClientQuotaConsumer2.configure(hashMap);
        Assertions.assertEquals(TenantClientQuotaConsumer.State.NOT_ENABLED, tenantClientQuotaConsumer2.state.get());
        tenantClientQuotaConsumer2.close();
        Assertions.assertEquals(TenantClientQuotaConsumer.State.NOT_ENABLED, tenantClientQuotaConsumer2.state.get());
    }

    private TenantClientQuotaConsumer createAndStartConsumerWithMockLog(Metrics metrics, Time time) {
        TenantClientQuotaConsumer tenantClientQuotaConsumer = new TenantClientQuotaConsumer((Map) null, metrics, time);
        tenantClientQuotaConsumer.configure((KafkaBasedLog) Mockito.mock(KafkaBasedLog.class), SESSION_UUID);
        tenantClientQuotaConsumer.start().join();
        return tenantClientQuotaConsumer;
    }

    private void closeConsumer(TenantClientQuotaConsumer tenantClientQuotaConsumer) {
        tenantClientQuotaConsumer.close();
        Assertions.assertEquals(TenantClientQuotaConsumer.State.CLOSED, tenantClientQuotaConsumer.state.get());
    }

    private byte[] encodeClientQuotaKey(String str, String str2) {
        return ClientQuotaKey.newBuilder().setClusterId(str).setPrincipal(str2).build().toByteArray();
    }

    private byte[] encodeClientQuotaValue(long j, long j2) {
        return ClientQuotaValue.newBuilder().setIngressBytesRate(j).setEgressBytesRate(j2).build().toByteArray();
    }

    private ConsumerRecord<byte[], byte[]> createRecord(long j, byte[] bArr, byte[] bArr2) {
        return new ConsumerRecord<>(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, bArr, bArr2, KafkaTestUtils.createGoodSequenceIdRecordHeaders(j, true), Optional.empty());
    }

    @Test
    public void testIgnoreBadMessages() {
        TenantClientQuotaConsumer createAndStartConsumerWithMockLog = createAndStartConsumerWithMockLog(new Metrics(), new MockTime());
        createAndStartConsumerWithMockLog.consume(new ConsumerRecord(TOPIC, 0, 0L, encodeClientQuotaKey("lkc-something", "user-id"), encodeClientQuotaValue(100L, 100L)));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas((String) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (QuotaConfig) ArgumentMatchers.any());
        }, Mockito.never());
        createAndStartConsumerWithMockLog.consume(createRecord(1L, new byte[0], encodeClientQuotaValue(100L, 100L)));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas((String) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (QuotaConfig) ArgumentMatchers.any());
        }, Mockito.never());
        createAndStartConsumerWithMockLog.consume(createRecord(1L, encodeClientQuotaKey("lkc-something", "user-id"), new byte[0]));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas((String) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (QuotaConfig) ArgumentMatchers.any());
        }, Mockito.never());
        closeConsumer(createAndStartConsumerWithMockLog);
    }

    @Test
    public void testConsumePrincipal() {
        TenantClientQuotaConsumer createAndStartConsumerWithMockLog = createAndStartConsumerWithMockLog(new Metrics(), new MockTime());
        byte[] encodeClientQuotaKey = encodeClientQuotaKey("lkc-some", "user-id");
        HashMap hashMap = new HashMap();
        hashMap.put("user-id", new QuotaConfig(100L, 100L, (Double) null, (Double) null, (Double) null, QuotaConfig.UNLIMITED_QUOTA));
        createAndStartConsumerWithMockLog.consume(createRecord(100L, encodeClientQuotaKey, encodeClientQuotaValue(100L, 100L)));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas("lkc-some", hashMap, QuotaConfig.UNLIMITED_QUOTA);
        });
        createAndStartConsumerWithMockLog.consume(createRecord(99L, encodeClientQuotaKey, encodeClientQuotaValue(10L, 10L)));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas((String) ArgumentMatchers.eq("lkc-some"), (Map) ArgumentMatchers.any(), (QuotaConfig) ArgumentMatchers.any());
        }, Mockito.times(1));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("user-id", new QuotaConfig(50L, 50L, (Double) null, (Double) null, (Double) null, QuotaConfig.UNLIMITED_QUOTA));
        createAndStartConsumerWithMockLog.consume(createRecord(101L, encodeClientQuotaKey, encodeClientQuotaValue(50L, 50L)));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas("lkc-some", hashMap2, QuotaConfig.UNLIMITED_QUOTA);
        });
        HashMap hashMap3 = new HashMap();
        hashMap3.put("user-id", QuotaConfig.UNLIMITED_QUOTA);
        createAndStartConsumerWithMockLog.consume(createRecord(102L, encodeClientQuotaKey, null));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas("lkc-some", hashMap3, QuotaConfig.UNLIMITED_QUOTA);
        });
        createAndStartConsumerWithMockLog.consume(createRecord(101L, encodeClientQuotaKey, encodeClientQuotaValue(900L, 900L)));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas("lkc-some", hashMap3, QuotaConfig.UNLIMITED_QUOTA);
        });
        closeConsumer(createAndStartConsumerWithMockLog);
    }

    @Test
    public void testConsumeDefaultQuota() {
        TenantClientQuotaConsumer createAndStartConsumerWithMockLog = createAndStartConsumerWithMockLog(new Metrics(), new MockTime());
        byte[] encodeClientQuotaKey = encodeClientQuotaKey("lkc-some", "<default>");
        byte[] encodeClientQuotaKey2 = encodeClientQuotaKey("lkc-some", "user-id");
        HashMap hashMap = new HashMap();
        hashMap.put("user-id", new QuotaConfig(100L, 100L, (Double) null, (Double) null, (Double) null, QuotaConfig.UNLIMITED_QUOTA));
        createAndStartConsumerWithMockLog.consume(createRecord(100L, encodeClientQuotaKey2, encodeClientQuotaValue(100L, 100L)));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas("lkc-some", hashMap, QuotaConfig.UNLIMITED_QUOTA);
        });
        QuotaConfig quotaConfig = new QuotaConfig(200L, 200L, (Double) null, (Double) null, (Double) null, QuotaConfig.UNLIMITED_QUOTA);
        createAndStartConsumerWithMockLog.consume(createRecord(101L, encodeClientQuotaKey, encodeClientQuotaValue(200L, 200L)));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas("lkc-some", hashMap, quotaConfig);
        });
        this.tenantQuotaCallback.reset();
        createAndStartConsumerWithMockLog.consume(createRecord(102L, encodeClientQuotaKey, null));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas("lkc-some", hashMap, QuotaConfig.UNLIMITED_QUOTA);
        });
        createAndStartConsumerWithMockLog.consume(createRecord(101L, encodeClientQuotaKey, encodeClientQuotaValue(900L, 900L)));
        this.tenantQuotaCallback.verify(() -> {
            TenantQuotaCallback.updateUserQuotas("lkc-some", hashMap, QuotaConfig.UNLIMITED_QUOTA);
        });
        closeConsumer(createAndStartConsumerWithMockLog);
    }

    @Test
    public void testIllegalArgumentException() {
        TenantClientQuotaConsumer createAndStartConsumerWithMockLog = createAndStartConsumerWithMockLog(new Metrics(), new MockTime());
        byte[] encodeClientQuotaKey = encodeClientQuotaKey("lkc-some", "user-id");
        new HashMap().put("user-id", new QuotaConfig(100L, 100L, (Double) null, (Double) null, (Double) null, QuotaConfig.UNLIMITED_QUOTA));
        this.tenantQuotaCallback.when(() -> {
            TenantQuotaCallback.updateUserQuotas((String) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (QuotaConfig) ArgumentMatchers.any());
        }).thenThrow(new Throwable[]{new IllegalArgumentException("Your exception message")});
        createAndStartConsumerWithMockLog.consume(createRecord(100L, encodeClientQuotaKey, encodeClientQuotaValue(100L, 100L)));
        closeConsumer(createAndStartConsumerWithMockLog);
    }
}
