package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.metrics.TenantMetricsTestUtils;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.server.ClientQuotaManager;
import kafka.server.ClusterLinkReplicationQuotaManager;
import kafka.server.KafkaBroker;
import kafka.server.ReplicationQuotaManager;
import kafka.server.ThreadUsageMetrics;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.server.config.DiskUsageBasedThrottlingConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.collection.JavaConverters;

@Tags({@Tag("integration"), @Tag("bazel:size:medium")})
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/BrokerBackpressureTest.class */
public class BrokerBackpressureTest {
    private static final int BROKER_COUNT = 1;
    private final Integer numIoThreads = 8;
    private final Integer numNetworkThreads = 4;
    private final Integer maxQueueSize = 500;
    private final AlterConfigsOptions configsOptions = new AlterConfigsOptions().timeoutMs(30000);
    private final ConfigResource defaultBrokerConfigResource = new ConfigResource(ConfigResource.Type.BROKER, "");
    private IntegrationTestHarness testHarness;

    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this.testHarness = new IntegrationTestHarness(testInfo, BROKER_COUNT);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    private Properties brokerProps() {
        Properties properties = new Properties();
        properties.put("alter.config.policy.class.name", AlterConfigPolicy.class.getName());
        properties.put("num.network.threads", this.numNetworkThreads.toString());
        properties.put("num.io.threads", this.numIoThreads.toString());
        properties.put("queued.max.requests", this.maxQueueSize.toString());
        properties.put("confluent.plugins.topic.policy.replication.factor", "1");
        return properties;
    }

    private Properties brokerPropsWithTenantQuotas() {
        Properties brokerProps = brokerProps();
        brokerProps.put("client.quota.callback.class", TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.multitenant.listener.names", "EXTERNAL");
        return brokerProps;
    }

    private Properties brokerPropsWithInvalidMultitenantListenerName() {
        Properties brokerProps = brokerProps();
        brokerProps.put("client.quota.callback.class", TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.multitenant.listener.names", "INVALID");
        return brokerProps;
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testNoTenantQuotasNoBackpressureConfig(String str) throws Exception {
        KafkaBroker kafkaBroker = this.testHarness.start(brokerProps()).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertFalse(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().clusterLinkRequest().backpressureEnabled(), "Expected clusterLinkRequest backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().fetch().tenantLevelQuotasEnabled());
        Assertions.assertFalse(kafkaBroker.quotaManagers().produce().tenantLevelQuotasEnabled());
        Assertions.assertFalse(kafkaBroker.quotaManagers().request().tenantLevelQuotasEnabled());
        Assertions.assertFalse(kafkaBroker.quotaManagers().clusterLinkRequest().tenantLevelQuotasEnabled());
        Assertions.assertEquals(this.numIoThreads.intValue() * 100.0d, ThreadUsageMetrics.ioThreadsCapacity(kafkaBroker.metrics()), 1.0d);
        Assertions.assertEquals(this.numNetworkThreads.intValue() * 100.0d, ThreadUsageMetrics.networkThreadsCapacity(kafkaBroker.metrics(), JavaConverters.asScalaBuffer(Collections.singletonList("EXTERNAL"))), 1.0d);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testNoBackpressureConfig(String str) throws Exception {
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithTenantQuotas()).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertFalse(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().clusterLinkRequest().backpressureEnabled(), "Expected clusterLinkRequest backpressure to be disabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().fetch().tenantLevelQuotasEnabled());
        Assertions.assertTrue(kafkaBroker.quotaManagers().produce().tenantLevelQuotasEnabled());
        Assertions.assertTrue(kafkaBroker.quotaManagers().request().tenantLevelQuotasEnabled());
        Assertions.assertTrue(kafkaBroker.quotaManagers().clusterLinkRequest().tenantLevelQuotasEnabled());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testFetchBackpressureOnlyConfig(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.backpressure.types", "fetch");
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertTrue(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertEquals(this.numIoThreads.intValue() * 100.0d, ThreadUsageMetrics.ioThreadsCapacity(kafkaBroker.metrics()), 1.0d);
        Assertions.assertEquals(this.numNetworkThreads.intValue() * 100.0d, ThreadUsageMetrics.networkThreadsCapacity(kafkaBroker.metrics(), JavaConverters.asScalaBuffer(Collections.singletonList("EXTERNAL"))), 1.0d);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testFetchAndProduceBackpressureOnlyConfig(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.backpressure.types", "fetch,produce");
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertTrue(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().clusterLinkRequest().backpressureEnabled(), "Expected clusterLinkRequest backpressure to be disabled");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testFetchAndProduceAndRequestBackpressureConfig(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.backpressure.types", "fetch,produce,request,clusterlinkrequest");
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertTrue(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().clusterLinkRequest().backpressureEnabled(), "Expected clusterLinkRequest backpressure to be enabled");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRequestBackpressureConfig(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.backpressure.types", TenantMetricsTestUtils.REQUEST_TAG);
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertFalse(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().clusterLinkRequest().backpressureEnabled(), "Expected clusterLinkRequest backpressure to be enabled");
        Assertions.assertEquals(this.maxQueueSize.intValue(), kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize, 0.0d);
        Assertions.assertEquals(ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_DEFAULT.doubleValue(), kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota, 0.0d);
        Assertions.assertEquals("p95", kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile);
        Assertions.assertEquals(this.numIoThreads.intValue() * 100.0d, ThreadUsageMetrics.ioThreadsCapacity(kafkaBroker.metrics()), 1.0d);
        Assertions.assertEquals(this.numNetworkThreads.intValue() * 100.0d, ThreadUsageMetrics.networkThreadsCapacity(kafkaBroker.metrics(), JavaConverters.asScalaBuffer(Collections.singletonList("EXTERNAL"))), 1.0d);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testNonDefaultRequestBackpressureConfig(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.backpressure.types", TenantMetricsTestUtils.REQUEST_TAG);
        brokerPropsWithTenantQuotas.put("confluent.backpressure.request.min.broker.limit", "150");
        brokerPropsWithTenantQuotas.put("confluent.backpressure.request.queue.size.percentile", "p99");
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertTrue(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
        Assertions.assertEquals(this.maxQueueSize.intValue(), kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize, 0.0d);
        Assertions.assertEquals(150.0d, kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota, 0.0d);
        Assertions.assertEquals("p99", kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRequestBackpressureConfigWithInvalidValuesSetsAcceptedValues(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.backpressure.types", TenantMetricsTestUtils.REQUEST_TAG);
        brokerPropsWithTenantQuotas.put("confluent.backpressure.request.min.broker.limit", "0");
        brokerPropsWithTenantQuotas.put("confluent.backpressure.request.queue.size.percentile", "100");
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertTrue(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
        Assertions.assertEquals(this.maxQueueSize.intValue(), kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize, 0.0d);
        Assertions.assertEquals(10.0d, kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota, 0.0d);
        Assertions.assertEquals("p95", kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicRequestBackpressureConfig(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.backpressure.types", TenantMetricsTestUtils.REQUEST_TAG);
        PhysicalCluster start = this.testHarness.start(brokerPropsWithTenantQuotas);
        KafkaBroker kafkaBroker = start.kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertTrue(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be enabled");
        Assertions.assertEquals(ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_DEFAULT.longValue(), kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota, 0.0d);
        Assertions.assertEquals("p95", kafkaBroker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile);
        AdminClient superAdminClient = start.superAdminClient();
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.request.min.broker.limit", "100"), this.configsOptions).all().get();
        for (KafkaBroker kafkaBroker2 : start.kafkaCluster().kafkaBrokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaBroker2.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota == 100.0d;
            }, "Expected min broker request limit to be updated to 100 on broker " + kafkaBroker2.config().brokerId());
            Assertions.assertEquals("p95", kafkaBroker2.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile);
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.request.queue.size.percentile", "p99"), this.configsOptions).all().get();
        for (KafkaBroker kafkaBroker3 : start.kafkaCluster().kafkaBrokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaBroker3.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile.equals("p99");
            }, "Expected queue size percentile to be updated to `p99` on broker " + kafkaBroker3.config().brokerId());
            Assertions.assertEquals(100.0d, kafkaBroker3.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota, 0.0d);
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.request.queue.size.percentile", "p101"), this.configsOptions).all().get();
        for (KafkaBroker kafkaBroker4 : start.kafkaCluster().kafkaBrokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaBroker4.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile.equals("p95");
            }, "Expected queue size percentile to be updated to `p95` on broker " + kafkaBroker4.config().brokerId());
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.request.min.broker.limit", "-1"), this.configsOptions).all().get();
        for (KafkaBroker kafkaBroker5 : start.kafkaCluster().kafkaBrokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaBroker5.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota == 10.0d;
            }, "Expected min broker request limit to be updated to 10 on broker " + kafkaBroker5.config().brokerId());
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRequestBackpressureConfigWithInvalidTenantListener(String str) throws Exception {
        Properties brokerPropsWithInvalidMultitenantListenerName = brokerPropsWithInvalidMultitenantListenerName();
        brokerPropsWithInvalidMultitenantListenerName.put("confluent.backpressure.types", "fetch,produce,request");
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithInvalidMultitenantListenerName).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertFalse(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testProduceBackpressureConfig(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.backpressure.types", "produce");
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertFalse(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testBackpressureDisabledWhenTenantQuotasDisabled(String str) throws Exception {
        Properties brokerProps = brokerProps();
        brokerProps.put("confluent.backpressure.types", "fetch,produce,request,clusterlinkrequest");
        KafkaBroker kafkaBroker = this.testHarness.start(brokerProps).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertFalse(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().clusterLinkRequest().backpressureEnabled(), "Expected clusterLinkRequest backpressure to be disabled");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidBackressureTypesAreIgnored(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.backpressure.types", "randomtype,produce,LeaderReplication");
        KafkaBroker kafkaBroker = this.testHarness.start(brokerPropsWithTenantQuotas).kafkaCluster().kafkaBrokers().get(0);
        Assertions.assertFalse(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled");
        Assertions.assertTrue(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled");
        Assertions.assertFalse(kafkaBroker.quotaManagers().clusterLinkRequest().backpressureEnabled(), "Expected clusterLinkRequest backpressure to be disabled");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicBackpressureConfig(String str) throws Exception {
        PhysicalCluster start = this.testHarness.start(brokerPropsWithTenantQuotas());
        for (KafkaBroker kafkaBroker : start.kafkaCluster().kafkaBrokers()) {
            Assertions.assertFalse(kafkaBroker.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be disabled on broker " + kafkaBroker.config().brokerId());
            Assertions.assertFalse(kafkaBroker.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be disabled on broker " + kafkaBroker.config().brokerId());
            Assertions.assertFalse(kafkaBroker.quotaManagers().request().backpressureEnabled(), "Expected request backpressure to be disabled on broker " + kafkaBroker.config().brokerId());
            Assertions.assertFalse(kafkaBroker.quotaManagers().clusterLinkRequest().backpressureEnabled(), "Expected clusterLinkRequest backpressure to be disabled on broker " + kafkaBroker.config().brokerId());
        }
        AdminClient superAdminClient = start.superAdminClient();
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.types", "fetch,produce,request,clusterlinkrequest"), this.configsOptions).all().get();
        for (KafkaBroker kafkaBroker2 : start.kafkaCluster().kafkaBrokers()) {
            TestUtils.waitForCondition(() -> {
                return kafkaBroker2.quotaManagers().fetch().backpressureEnabled();
            }, "Expected consume backpressure to be enabled on broker " + kafkaBroker2.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return kafkaBroker2.quotaManagers().produce().backpressureEnabled();
            }, "Expected produce backpressure to be enabled on broker " + kafkaBroker2.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return kafkaBroker2.quotaManagers().request().backpressureEnabled();
            }, "Expected request backpressure to be enabled on broker " + kafkaBroker2.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return kafkaBroker2.quotaManagers().clusterLinkRequest().backpressureEnabled();
            }, "Expected clusterLinkRequest backpressure to be enabled on broker " + kafkaBroker2.config().brokerId());
            Assertions.assertEquals(this.maxQueueSize.intValue(), kafkaBroker2.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize, 0.0d);
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.types", "fetch,produce"), this.configsOptions).all().get();
        for (KafkaBroker kafkaBroker3 : start.kafkaCluster().kafkaBrokers()) {
            Assertions.assertTrue(kafkaBroker3.quotaManagers().fetch().backpressureEnabled(), "Expected consume backpressure to be enabled on broker " + kafkaBroker3.config().brokerId());
            Assertions.assertTrue(kafkaBroker3.quotaManagers().produce().backpressureEnabled(), "Expected produce backpressure to be enabled on broker " + kafkaBroker3.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return !kafkaBroker3.quotaManagers().request().backpressureEnabled();
            }, "Expected request backpressure to be disabled on broker " + kafkaBroker3.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return !kafkaBroker3.quotaManagers().clusterLinkRequest().backpressureEnabled();
            }, "Expected clusterLinkRequest backpressure to be disabled on broker " + kafkaBroker3.config().brokerId());
        }
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.types", ""), this.configsOptions).all().get();
        for (KafkaBroker kafkaBroker4 : start.kafkaCluster().kafkaBrokers()) {
            TestUtils.waitForCondition(() -> {
                return !kafkaBroker4.quotaManagers().fetch().backpressureEnabled();
            }, "Expected consume backpressure to be disabled on broker " + kafkaBroker4.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return !kafkaBroker4.quotaManagers().produce().backpressureEnabled();
            }, "Expected produce backpressure to be disabled on broker " + kafkaBroker4.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return !kafkaBroker4.quotaManagers().request().backpressureEnabled();
            }, "Expected request backpressure to be disabled on broker " + kafkaBroker4.config().brokerId());
            TestUtils.waitForCondition(() -> {
                return !kafkaBroker4.quotaManagers().clusterLinkRequest().backpressureEnabled();
            }, "Expected clusterLinkRequest backpressure to be disabled on broker " + kafkaBroker4.config().brokerId());
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicDiskThrottlingConfig(String str) throws Exception {
        PhysicalCluster start = this.testHarness.start(brokerPropsWithTenantQuotas());
        KafkaBroker kafkaBroker = start.kafkaCluster().kafkaBrokers().get(0);
        ClientQuotaManager produce = kafkaBroker.quotaManagers().produce();
        Assertions.assertEquals(new DiskUsageBasedThrottlingConfig(21474836480L, 131072L, (List) JavaConverters.seqAsJavaList(kafkaBroker.logManager().liveLogDirs()).stream().map((v0) -> {
            return v0.getAbsolutePath();
        }).collect(Collectors.toList()), false, DiskUsageBasedThrottlingConfig.DEFAULT_DISK_CHECK_FREQUENCY_MS, 1.5d, kafkaBroker.config().clusterLinkEnable().booleanValue()), produce.getCurrentDiskThrottlingConfig());
        AdminClient superAdminClient = start.superAdminClient();
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.disk.enable", "true"), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().enableDiskBasedThrottling;
        }, "Expected confluent.backpressure.disk.enable to be set as true on " + kafkaBroker.config().brokerId());
        TestUtils.waitForCondition(() -> {
            return produce.diskThrottlingEnabledInConfig(produce.getCurrentDiskThrottlingConfig());
        }, "Expected diskThrottling() to be enabled on " + kafkaBroker.config().brokerId());
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.disk.free.threshold.bytes", String.valueOf(107374182400L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().freeDiskThresholdBytes == 107374182400L;
        }, "Expected confluent.backpressure.disk.free.threshold.bytes to be set as 107374182400 on " + kafkaBroker.config().brokerId());
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.disk.produce.bytes.per.second", String.valueOf(65536L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().throttledProduceThroughput == 65536;
        }, "Expected {} confluent.backpressure.disk.produce.bytes.per.second to be set as 65536 on " + kafkaBroker.config().brokerId());
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.disk.threshold.recovery.factor", String.valueOf(1.5d)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().freeDiskThresholdBytesRecoveryFactor == 1.5d;
        }, "Expected {} confluent.backpressure.disk.threshold.recovery.factor to be set as 1.5 on " + kafkaBroker.config().brokerId());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicDiskThrottlingConfigWithClusterLinking(String str) throws Exception {
        Properties brokerPropsWithTenantQuotas = brokerPropsWithTenantQuotas();
        brokerPropsWithTenantQuotas.put("confluent.cluster.link.enable", "true");
        brokerPropsWithTenantQuotas.put("confluent.backpressure.disk.enable", "true");
        brokerPropsWithTenantQuotas.put("confluent.backpressure.disk.free.threshold.bytes", String.valueOf(Long.MAX_VALUE));
        PhysicalCluster start = this.testHarness.start(brokerPropsWithTenantQuotas);
        KafkaBroker kafkaBroker = start.kafkaCluster().kafkaBrokers().get(0);
        ClientQuotaManager produce = kafkaBroker.quotaManagers().produce();
        ClusterLinkReplicationQuotaManager clusterLinkProduce = kafkaBroker.quotaManagers().clusterLinkProduce();
        ReplicationQuotaManager follower = kafkaBroker.quotaManagers().follower();
        Assertions.assertEquals(new DiskUsageBasedThrottlingConfig(Long.MAX_VALUE, 131072L, (List) JavaConverters.seqAsJavaList(kafkaBroker.logManager().liveLogDirs()).stream().map((v0) -> {
            return v0.getAbsolutePath();
        }).collect(Collectors.toList()), true, DiskUsageBasedThrottlingConfig.DEFAULT_DISK_CHECK_FREQUENCY_MS, 1.5d, true), produce.getCurrentDiskThrottlingConfig());
        TestUtils.waitForCondition(() -> {
            return produce.getBrokerQuotaLimit() == 131072.0d;
        }, () -> {
            return "Expected throughput 131072, got " + produce.getBrokerQuotaLimit();
        });
        TestUtils.waitForCondition(() -> {
            return clusterLinkProduce.getBrokerQuotaLimit() == 131072.0d;
        }, () -> {
            return "Expected throughput 131072, got " + clusterLinkProduce.getBrokerQuotaLimit();
        });
        TestUtils.waitForCondition(() -> {
            return follower.getBrokerQuotaLimit() == 524288.0d;
        }, () -> {
            return "Expected throughput 131072, got " + follower.getBrokerQuotaLimit();
        });
        AdminClient superAdminClient = start.superAdminClient();
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.disk.free.threshold.bytes", String.valueOf(107374182400L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().freeDiskThresholdBytes == 107374182400L;
        }, () -> {
            return "Expected threshold 214748364800, got " + produce.getCurrentDiskThrottlingConfig().freeDiskThresholdBytes;
        });
        superAdminClient.incrementalAlterConfigs(backpressureConfig("confluent.backpressure.disk.produce.bytes.per.second", String.valueOf(65536L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> {
            return produce.getCurrentDiskThrottlingConfig().throttledProduceThroughput == 65536;
        }, () -> {
            return "Expected throughput 65536, got " + produce.getCurrentDiskThrottlingConfig().throttledProduceThroughput;
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicEnableRequestBackpressureFailsWithoutMultitenantListener(String str) throws Exception {
        AdminClient superAdminClient = this.testHarness.start(brokerPropsWithInvalidMultitenantListenerName()).superAdminClient();
        Assertions.assertThrows(ExecutionException.class, () -> {
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicEnableBackpressureFailsWithoutTenantQuotasEnabled(String str) throws Exception {
        AdminClient superAdminClient = this.testHarness.start(brokerProps()).superAdminClient();
        Assertions.assertThrows(ExecutionException.class, () -> {
        });
    }

    private Map<ConfigResource, Collection<AlterConfigOp>> backpressureConfig(String str, String str2) {
        return Collections.singletonMap(this.defaultBrokerConfigResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(str, str2), AlterConfigOp.OpType.SET)));
    }
}
