/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.server.BrokerBackpressureConfig;
import kafka.server.ClientQuotaManager;
import kafka.server.DiskUsageBasedThrottlingConfig;
import kafka.server.DiskUsageBasedThrottlingConfig$;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.ReplicationQuotaManager;
import kafka.server.ThreadUsageMetrics;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import scala.collection.JavaConverters;
import scala.collection.Seq;

@Category(value={IntegrationTest.class})
public class BrokerBackpressureTest {
    private static final int BROKER_COUNT = 1;
    private final Integer numIoThreads = 8;
    private final Integer numNetworkThreads = 4;
    private final Integer maxQueueSize = 500;
    private final AlterConfigsOptions configsOptions = new AlterConfigsOptions().timeoutMs(Integer.valueOf(30000));
    private final ConfigResource defaultBrokerConfigResource = new ConfigResource(ConfigResource.Type.BROKER, "");
    private IntegrationTestHarness testHarness;

    @Before
    public void setUp() {
        this.testHarness = new IntegrationTestHarness(1);
    }

    @After
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    private Properties brokerProps() {
        Properties props = new Properties();
        props.put(KafkaConfig$.MODULE$.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        props.put(KafkaConfig$.MODULE$.NumNetworkThreadsProp(), this.numNetworkThreads.toString());
        props.put(KafkaConfig$.MODULE$.NumIoThreadsProp(), this.numIoThreads.toString());
        props.put(KafkaConfig$.MODULE$.QueuedMaxRequestsProp(), this.maxQueueSize.toString());
        props.put("confluent.plugins.topic.policy.replication.factor", "1");
        return props;
    }

    private Properties brokerPropsWithTenantQuotas() {
        Properties props = this.brokerProps();
        props.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        props.put("confluent.multitenant.listener.names", "EXTERNAL");
        return props;
    }

    private Properties brokerPropsWithInvalidMultitenantListenerName() {
        Properties props = this.brokerProps();
        props.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        props.put("confluent.multitenant.listener.names", "INVALID");
        return props;
    }

    @Test
    public void testNoTenantQuotasNoBackpressureConfig() throws Exception {
        PhysicalCluster physicalCluster = this.testHarness.start(this.brokerProps());
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertFalse((String)"Expected consume backpressure to be disabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
        Assert.assertFalse((String)"Expected produce backpressure to be disabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertFalse((String)"Expected request backpressure to be disabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
        Assert.assertFalse((boolean)broker.quotaManagers().fetch().tenantLevelQuotasEnabled());
        Assert.assertFalse((boolean)broker.quotaManagers().produce().tenantLevelQuotasEnabled());
        Assert.assertFalse((boolean)broker.quotaManagers().request().tenantLevelQuotasEnabled());
        Assert.assertEquals((double)((double)this.numIoThreads.intValue() * 100.0), (double)ThreadUsageMetrics.ioThreadsCapacity((Metrics)broker.metrics()), (double)1.0);
        Assert.assertEquals((double)((double)this.numNetworkThreads.intValue() * 100.0), (double)ThreadUsageMetrics.networkThreadsCapacity((Metrics)broker.metrics(), (Seq)JavaConverters.asScalaBuffer(Collections.singletonList("EXTERNAL"))), (double)1.0);
    }

    @Test
    public void testNoBackpressureConfig() throws Exception {
        PhysicalCluster physicalCluster = this.testHarness.start(this.brokerPropsWithTenantQuotas());
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertFalse((String)"Expected consume backpressure to be disabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
        Assert.assertFalse((String)"Expected produce backpressure to be disabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertFalse((String)"Expected request backpressure to be disabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
        Assert.assertTrue((boolean)broker.quotaManagers().fetch().tenantLevelQuotasEnabled());
        Assert.assertTrue((boolean)broker.quotaManagers().produce().tenantLevelQuotasEnabled());
        Assert.assertTrue((boolean)broker.quotaManagers().request().tenantLevelQuotasEnabled());
    }

    @Test
    public void testFetchBackpressureOnlyConfig() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        props.put("confluent.backpressure.types", "fetch");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertTrue((String)"Expected consume backpressure to be enabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
        Assert.assertFalse((String)"Expected produce backpressure to be disabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertFalse((String)"Expected request backpressure to be disabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
        Assert.assertEquals((double)((double)this.numIoThreads.intValue() * 100.0), (double)ThreadUsageMetrics.ioThreadsCapacity((Metrics)broker.metrics()), (double)1.0);
        Assert.assertEquals((double)((double)this.numNetworkThreads.intValue() * 100.0), (double)ThreadUsageMetrics.networkThreadsCapacity((Metrics)broker.metrics(), (Seq)JavaConverters.asScalaBuffer(Collections.singletonList("EXTERNAL"))), (double)1.0);
    }

    @Test
    public void testFetchAndProduceBackpressureOnlyConfig() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        props.put("confluent.backpressure.types", "fetch,produce");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertTrue((String)"Expected consume backpressure to be enabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
        Assert.assertTrue((String)"Expected produce backpressure to be enabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertFalse((String)"Expected request backpressure to be disabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
    }

    @Test
    public void testFetchAndProduceAndRequestBackpressureConfig() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        props.put("confluent.backpressure.types", "fetch,produce,request");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertTrue((String)"Expected consume backpressure to be enabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
        Assert.assertTrue((String)"Expected produce backpressure to be enabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertTrue((String)"Expected request backpressure to be enabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
    }

    @Test
    public void testRequestBackpressureConfig() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        props.put("confluent.backpressure.types", "request");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertFalse((String)"Expected consume backpressure to be disabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
        Assert.assertFalse((String)"Expected produce backpressure to be disabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertTrue((String)"Expected request backpressure to be enabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
        Assert.assertEquals((double)this.maxQueueSize.intValue(), (double)broker.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize(), (double)0.0);
        Assert.assertEquals((double)ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_DEFAULT.doubleValue(), (double)broker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), (double)0.0);
        Assert.assertEquals((Object)"p95", (Object)broker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
        Assert.assertEquals((double)((double)this.numIoThreads.intValue() * 100.0), (double)ThreadUsageMetrics.ioThreadsCapacity((Metrics)broker.metrics()), (double)1.0);
        Assert.assertEquals((double)((double)this.numNetworkThreads.intValue() * 100.0), (double)ThreadUsageMetrics.networkThreadsCapacity((Metrics)broker.metrics(), (Seq)JavaConverters.asScalaBuffer(Collections.singletonList("EXTERNAL"))), (double)1.0);
    }

    @Test
    public void testNonDefaultRequestBackpressureConfig() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        props.put("confluent.backpressure.types", "request");
        props.put("confluent.backpressure.request.min.broker.limit", "150");
        props.put("confluent.backpressure.request.queue.size.percentile", "p99");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertTrue((String)"Expected request backpressure to be enabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
        Assert.assertEquals((double)this.maxQueueSize.intValue(), (double)broker.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize(), (double)0.0);
        Assert.assertEquals((double)150.0, (double)broker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), (double)0.0);
        Assert.assertEquals((Object)"p99", (Object)broker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
    }

    @Test
    public void testRequestBackpressureConfigWithInvalidValuesSetsAcceptedValues() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        props.put("confluent.backpressure.types", "request");
        props.put("confluent.backpressure.request.min.broker.limit", "0");
        props.put("confluent.backpressure.request.queue.size.percentile", "100");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertTrue((String)"Expected request backpressure to be enabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
        Assert.assertEquals((double)this.maxQueueSize.intValue(), (double)broker.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize(), (double)0.0);
        Assert.assertEquals((double)BrokerBackpressureConfig.MinBrokerRequestQuota(), (double)broker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), (double)0.0);
        Assert.assertEquals((Object)"p95", (Object)broker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
    }

    @Test
    public void testDynamicRequestBackpressureConfig() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        props.put("confluent.backpressure.types", "request");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker0 = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertTrue((String)"Expected request backpressure to be enabled", (boolean)broker0.quotaManagers().request().backpressureEnabled());
        Assert.assertEquals((double)ConfluentConfigs.BACKPRESSURE_REQUEST_MIN_BROKER_LIMIT_DEFAULT.longValue(), (double)broker0.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), (double)0.0);
        Assert.assertEquals((Object)"p95", (Object)broker0.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
        AdminClient adminClient = physicalCluster.superAdminClient();
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.request.min.broker.limit", "100"), this.configsOptions).all().get();
        for (KafkaServer broker : physicalCluster.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> broker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota() == 100.0, (String)("Expected min broker request limit to be updated to 100 on broker " + broker.config().brokerId()));
            Assert.assertEquals((Object)"p95", (Object)broker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile());
        }
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.request.queue.size.percentile", "p99"), this.configsOptions).all().get();
        for (KafkaServer broker : physicalCluster.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> broker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile().equals("p99"), (String)("Expected queue size percentile to be updated to `p99` on broker " + broker.config().brokerId()));
            Assert.assertEquals((double)100.0, (double)broker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota(), (double)0.0);
        }
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.request.queue.size.percentile", "p101"), this.configsOptions).all().get();
        for (KafkaServer broker : physicalCluster.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> broker.quotaManagers().request().dynamicBackpressureConfig().queueSizePercentile().equals("p95"), (String)("Expected queue size percentile to be updated to `p95` on broker " + broker.config().brokerId()));
        }
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.request.min.broker.limit", "-1"), this.configsOptions).all().get();
        for (KafkaServer broker : physicalCluster.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> broker.quotaManagers().request().dynamicBackpressureConfig().minBrokerRequestQuota() == BrokerBackpressureConfig.MinBrokerRequestQuota(), (String)("Expected min broker request limit to be updated to 10 on broker " + broker.config().brokerId()));
        }
    }

    @Test
    public void testRequestBackpressureConfigWithInvalidTenantListener() throws Exception {
        Properties props = this.brokerPropsWithInvalidMultitenantListenerName();
        props.put("confluent.backpressure.types", "fetch,produce,request");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertFalse((String)"Expected request backpressure to be disabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
        Assert.assertTrue((String)"Expected produce backpressure to be enabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertTrue((String)"Expected consume backpressure to be enabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
    }

    @Test
    public void testProduceBackpressureConfig() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        props.put("confluent.backpressure.types", "produce");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertFalse((String)"Expected consume backpressure to be disabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
        Assert.assertTrue((String)"Expected produce backpressure to be enabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertFalse((String)"Expected request backpressure to be disabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
    }

    @Test
    public void testBackpressureDisabledWhenTenantQuotasDisabled() throws Exception {
        Properties props = this.brokerProps();
        props.put("confluent.backpressure.types", "fetch,produce,request");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertFalse((String)"Expected consume backpressure to be disabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
        Assert.assertFalse((String)"Expected produce backpressure to be disabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertFalse((String)"Expected request backpressure to be disabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
    }

    @Test
    public void testInvalidBackressureTypesAreIgnored() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        props.put("confluent.backpressure.types", "randomtype,produce,LeaderReplication");
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        Assert.assertFalse((String)"Expected consume backpressure to be disabled", (boolean)broker.quotaManagers().fetch().backpressureEnabled());
        Assert.assertTrue((String)"Expected produce backpressure to be enabled", (boolean)broker.quotaManagers().produce().backpressureEnabled());
        Assert.assertFalse((String)"Expected request backpressure to be disabled", (boolean)broker.quotaManagers().request().backpressureEnabled());
    }

    @Test
    public void testDynamicBackpressureConfig() throws Exception {
        PhysicalCluster physicalCluster = this.testHarness.start(this.brokerPropsWithTenantQuotas());
        for (KafkaServer broker : physicalCluster.kafkaCluster().brokers()) {
            Assert.assertFalse((String)("Expected consume backpressure to be disabled on broker " + broker.config().brokerId()), (boolean)broker.quotaManagers().fetch().backpressureEnabled());
            Assert.assertFalse((String)("Expected produce backpressure to be disabled on broker " + broker.config().brokerId()), (boolean)broker.quotaManagers().produce().backpressureEnabled());
            Assert.assertFalse((String)("Expected request backpressure to be disabled on broker " + broker.config().brokerId()), (boolean)broker.quotaManagers().request().backpressureEnabled());
        }
        AdminClient adminClient = physicalCluster.superAdminClient();
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.types", "fetch,produce,request"), this.configsOptions).all().get();
        for (KafkaServer broker : physicalCluster.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> broker.quotaManagers().fetch().backpressureEnabled(), (String)("Expected consume backpressure to be enabled on broker " + broker.config().brokerId()));
            TestUtils.waitForCondition(() -> broker.quotaManagers().produce().backpressureEnabled(), (String)("Expected produce backpressure to be enabled on broker " + broker.config().brokerId()));
            TestUtils.waitForCondition(() -> broker.quotaManagers().request().backpressureEnabled(), (String)("Expected request backpressure to be enabled on broker " + broker.config().brokerId()));
            Assert.assertEquals((double)this.maxQueueSize.intValue(), (double)broker.quotaManagers().request().dynamicBackpressureConfig().maxQueueSize(), (double)0.0);
        }
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.types", "fetch,produce"), this.configsOptions).all().get();
        for (KafkaServer broker : physicalCluster.kafkaCluster().brokers()) {
            Assert.assertTrue((String)("Expected consume backpressure to be enabled on broker " + broker.config().brokerId()), (boolean)broker.quotaManagers().fetch().backpressureEnabled());
            Assert.assertTrue((String)("Expected produce backpressure to be enabled on broker " + broker.config().brokerId()), (boolean)broker.quotaManagers().produce().backpressureEnabled());
            TestUtils.waitForCondition(() -> !broker.quotaManagers().request().backpressureEnabled(), (String)("Expected request backpressure to be disabled on broker " + broker.config().brokerId()));
        }
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.types", ""), this.configsOptions).all().get();
        for (KafkaServer broker : physicalCluster.kafkaCluster().brokers()) {
            TestUtils.waitForCondition(() -> !broker.quotaManagers().fetch().backpressureEnabled(), (String)("Expected consume backpressure to be disabled on broker " + broker.config().brokerId()));
            TestUtils.waitForCondition(() -> !broker.quotaManagers().produce().backpressureEnabled(), (String)("Expected produce backpressure to be disabled on broker " + broker.config().brokerId()));
            TestUtils.waitForCondition(() -> !broker.quotaManagers().request().backpressureEnabled(), (String)("Expected request backpressure to be disabled on broker " + broker.config().brokerId()));
        }
    }

    @Test
    public void testDynamicDiskThrottlingConfig() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        ClientQuotaManager quotaManager = broker.quotaManagers().produce();
        List logDirs = JavaConverters.seqAsJavaList((Seq)broker.logManager().liveLogDirs());
        List fileStores = logDirs.stream().map(File::getAbsolutePath).collect(Collectors.toList());
        DiskUsageBasedThrottlingConfig defaultConfig = DiskUsageBasedThrottlingConfig$.MODULE$.apply(0x500000000L, 131072L, (Seq)JavaConverters.asScalaBuffer(fileStores).toSeq(), false, DiskUsageBasedThrottlingConfig$.MODULE$.DefaultDiskCheckFrequencyMs(), 1.5, false);
        Assert.assertEquals((Object)defaultConfig, (Object)quotaManager.getCurrentDiskThrottlingConfig());
        long diskThreshold = 0x1900000000L;
        long throughput = 65536L;
        double recoveryFactor = 1.5;
        AdminClient adminClient = physicalCluster.superAdminClient();
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.disk.enable", "true"), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> quotaManager.getCurrentDiskThrottlingConfig().enableDiskBasedThrottling(), (String)("Expected confluent.backpressure.disk.enable to be set as true on " + broker.config().brokerId()));
        TestUtils.waitForCondition(() -> quotaManager.diskThrottlingEnabledInConfig(quotaManager.getCurrentDiskThrottlingConfig()), (String)("Expected diskThrottling() to be enabled on " + broker.config().brokerId()));
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.disk.free.threshold.bytes", String.valueOf(0x1900000000L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> quotaManager.getCurrentDiskThrottlingConfig().freeDiskThresholdBytes() == 0x1900000000L, (String)("Expected confluent.backpressure.disk.free.threshold.bytes to be set as 107374182400 on " + broker.config().brokerId()));
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.disk.produce.bytes.per.second", String.valueOf(65536L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> quotaManager.getCurrentDiskThrottlingConfig().throttledProduceThroughput() == 65536L, (String)("Expected {} confluent.backpressure.disk.produce.bytes.per.second to be set as 65536 on " + broker.config().brokerId()));
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.disk.threshold.recovery.factor", String.valueOf(1.5)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> quotaManager.getCurrentDiskThrottlingConfig().freeDiskThresholdBytesRecoveryFactor() == 1.5, (String)("Expected {} confluent.backpressure.disk.threshold.recovery.factor to be set as 1.5 on " + broker.config().brokerId()));
    }

    @Test
    public void testDynamicDiskThrottlingConfigWithClusterLinking() throws Exception {
        Properties props = this.brokerPropsWithTenantQuotas();
        long thresholdBytes = Long.MAX_VALUE;
        props.put("confluent.cluster.link.enable", "true");
        props.put("confluent.backpressure.disk.enable", "true");
        props.put("confluent.backpressure.disk.free.threshold.bytes", String.valueOf(thresholdBytes));
        PhysicalCluster physicalCluster = this.testHarness.start(props);
        KafkaServer broker = physicalCluster.kafkaCluster().brokers().get(0);
        ClientQuotaManager produceQuotaManager = broker.quotaManagers().produce();
        ReplicationQuotaManager linkQuotaManager = broker.quotaManagers().clusterLink();
        ReplicationQuotaManager followerQuotaManager = broker.quotaManagers().follower();
        List logDirs = JavaConverters.seqAsJavaList((Seq)broker.logManager().liveLogDirs());
        List fileStores = logDirs.stream().map(File::getAbsolutePath).collect(Collectors.toList());
        long defaultThroughput = 131072L;
        DiskUsageBasedThrottlingConfig throttlingConfig = DiskUsageBasedThrottlingConfig$.MODULE$.apply(thresholdBytes, 131072L, (Seq)JavaConverters.asScalaBuffer(fileStores).toSeq(), true, DiskUsageBasedThrottlingConfig$.MODULE$.DefaultDiskCheckFrequencyMs(), 1.5, true);
        Assert.assertEquals((Object)throttlingConfig, (Object)produceQuotaManager.getCurrentDiskThrottlingConfig());
        TestUtils.waitForCondition(() -> produceQuotaManager.getBrokerQuotaLimit() == 131072.0, () -> "Expected throughput 131072, got " + produceQuotaManager.getBrokerQuotaLimit());
        TestUtils.waitForCondition(() -> linkQuotaManager.getBrokerQuotaLimit() == 131072.0, () -> "Expected throughput 131072, got " + linkQuotaManager.getBrokerQuotaLimit());
        TestUtils.waitForCondition(() -> followerQuotaManager.getBrokerQuotaLimit() == 524288.0, () -> "Expected throughput 131072, got " + followerQuotaManager.getBrokerQuotaLimit());
        long diskThreshold = 0x1900000000L;
        long throughput = 65536L;
        AdminClient adminClient = physicalCluster.superAdminClient();
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.disk.free.threshold.bytes", String.valueOf(0x1900000000L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> produceQuotaManager.getCurrentDiskThrottlingConfig().freeDiskThresholdBytes() == 0x1900000000L, () -> "Expected threshold 214748364800, got " + produceQuotaManager.getCurrentDiskThrottlingConfig().freeDiskThresholdBytes());
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.disk.produce.bytes.per.second", String.valueOf(65536L)), this.configsOptions).all().get();
        TestUtils.waitForCondition(() -> produceQuotaManager.getCurrentDiskThrottlingConfig().throttledProduceThroughput() == 65536L, () -> "Expected throughput 65536, got " + produceQuotaManager.getCurrentDiskThrottlingConfig().throttledProduceThroughput());
    }

    @Test(expected=ExecutionException.class)
    public void testDynamicEnableRequestBackpressureFailsWithoutMultitenantListener() throws Exception {
        PhysicalCluster physicalCluster = this.testHarness.start(this.brokerPropsWithInvalidMultitenantListenerName());
        AdminClient adminClient = physicalCluster.superAdminClient();
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.types", "request"), this.configsOptions).all().get();
    }

    @Test(expected=ExecutionException.class)
    public void testDynamicEnableBackpressureFailsWithoutTenantQuotasEnabled() throws Exception {
        PhysicalCluster physicalCluster = this.testHarness.start(this.brokerProps());
        AdminClient adminClient = physicalCluster.superAdminClient();
        adminClient.incrementalAlterConfigs(this.backpressureConfig("confluent.backpressure.types", "fetch,produce,request"), this.configsOptions).all().get();
    }

    private Map<ConfigResource, Collection<AlterConfigOp>> backpressureConfig(String configKey, String configValue) {
        ConfigEntry backpressureCfg = new ConfigEntry(configKey, configValue);
        List<AlterConfigOp> brokerConfigs = Collections.singletonList(new AlterConfigOp(backpressureCfg, AlterConfigOp.OpType.SET));
        return Collections.singletonMap(this.defaultBrokerConfigResource, brokerConfigs);
    }
}

