/*
 * Decompiled with CFR 0.152.
 */
package kafka.network;

import java.net.InetAddress;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import kafka.network.ConnectionQuotas$;
import kafka.network.DataPlaneAcceptor;
import kafka.network.Processor$;
import kafka.network.SocketServer$;
import kafka.server.BaseRequestTest;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Tuple2;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0005\u0005md\u0001\u0002\u000e\u001c\u0001\u0001BQa\n\u0001\u0005\u0002!Bqa\u000b\u0001C\u0002\u0013\u0005A\u0006\u0003\u0004:\u0001\u0001\u0006I!\f\u0005\bu\u0001\u0011\r\u0011\"\u0001<\u0011\u0019!\u0005\u0001)A\u0005y!9Q\t\u0001b\u0001\n\u00031\u0005BB'\u0001A\u0003%q\tC\u0004O\u0001\t\u0007I\u0011\u0001$\t\r=\u0003\u0001\u0015!\u0003H\u0011\u001d\u0001\u0006A1A\u0005\u0002ECa!\u0017\u0001!\u0002\u0013\u0011\u0006b\u0002.\u0001\u0005\u0004%\ta\u0017\u0005\u0007?\u0002\u0001\u000b\u0011\u0002/\t\u000f\u0001\u0004!\u0019!C\u0001C\"1\u0001\u000e\u0001Q\u0001\n\tDQ!\u001b\u0001\u0005B\u0019CQA\u001b\u0001\u0005B-DQA\u001e\u0001\u0005B]Dq!!\u0001\u0001\t\u0003\t\u0019\u0001C\u0004\u0002\u001c\u0001!\t!a\u0001\t\u000f\u0005}\u0001\u0001\"\u0003\u0002\"!9\u0011q\b\u0001\u0005\n\u0005\u0005\u0003bBA$\u0001\u0011%\u0011\u0011\n\u0005\n\u0003C\u0002\u0011\u0013!C\u0005\u0003GBq!!\u001f\u0001\t\u0013\t\u0019A\u0001\u0017D_:4G.^3oi\u000e{gN\\3di&|g.U;pi\u0006\u001c(+Z2p]\u001aLw-\u001e:bi&|g\u000eV3ti*\u0011A$H\u0001\b]\u0016$xo\u001c:l\u0015\u0005q\u0012!B6bM.\f7\u0001A\n\u0003\u0001\u0005\u0002\"AI\u0013\u000e\u0003\rR!\u0001J\u000f\u0002\rM,'O^3s\u0013\t13EA\bCCN,'+Z9vKN$H+Z:u\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0006\u0005\u0002+\u00015\t1$\u0001\u0005mSN$XM\\3s+\u0005i\u0003C\u0001\u00188\u001b\u0005y#B\u0001\u000f1\u0015\t\t$'\u0001\u0004d_6lwN\u001c\u0006\u0003=MR!\u0001N\u001b\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00051\u0014aA8sO&\u0011\u0001h\f\u0002\r\u0019&\u001cH/\u001a8fe:\u000bW.Z\u0001\nY&\u001cH/\u001a8fe\u0002\nA\u0002\\8dC2\fE\r\u001a:fgN,\u0012\u0001\u0010\t\u0003{\tk\u0011A\u0010\u0006\u0003\u007f\u0001\u000b1A\\3u\u0015\u0005\t\u0015\u0001\u00026bm\u0006L!a\u0011 \u0003\u0017%sW\r^!eIJ,7o]\u0001\u000eY>\u001c\u0017\r\\!eIJ,7o\u001d\u0011\u0002;Ad\u0017-\u001b8uKb$H*[:uK:,'\u000fR3gCVdG/U;pi\u0006,\u0012a\u0012\t\u0003\u0011.k\u0011!\u0013\u0006\u0002\u0015\u0006)1oY1mC&\u0011A*\u0013\u0002\u0004\u0013:$\u0018A\b9mC&tG/\u001a=u\u0019&\u001cH/\u001a8fe\u0012+g-Y;miF+x\u000e^1!\u0003u\u0001H.Y5oi\u0016DH\u000fT5ti\u0016tWM\u001d#fM\u0006,H\u000e\u001e$m_>\u0014\u0018A\b9mC&tG/\u001a=u\u0019&\u001cH/\u001a8fe\u0012+g-Y;mi\u001acwn\u001c:!\u0003\u0011!\u0018.\\3\u0016\u0003I\u0003\"aU,\u000e\u0003QS!!\u0016,\u0002\tU$\u0018\u000e\u001c\u0006\u0003IIJ!\u0001\u0017+\u0003\u00115{7m\u001b+j[\u0016\fQ\u0001^5nK\u0002\nqa\u001d7fKBl5/F\u0001]!\tAU,\u0003\u0002_\u0013\n!Aj\u001c8h\u0003!\u0019H.Z3q\u001bN\u0004\u0013aF2p]:,7\r^5p]J\u000bG/\u001a$m_>\u0014\bK]8q+\u0005\u0011\u0007CA2g\u001b\u0005!'BA3A\u0003\u0011a\u0017M\\4\n\u0005\u001d$'AB*ue&tw-\u0001\rd_:tWm\u0019;j_:\u0014\u0016\r^3GY>|'\u000f\u0015:pa\u0002\n1B\u0019:pW\u0016\u00148i\\;oi\u00069\"M]8lKJ\u0004&o\u001c9feRLxJ^3se&$Wm\u001d\u000b\u0003Y>\u0004\"\u0001S7\n\u00059L%\u0001B+oSRDQ\u0001]\tA\u0002E\f!\u0002\u001d:pa\u0016\u0014H/[3t!\t\u0011H/D\u0001t\u0015\t)\u0006)\u0003\u0002vg\nQ\u0001K]8qKJ$\u0018.Z:\u0002\u0015\t\u0014xn[3s)&lW\r\u0006\u0002y}B\u0011\u0011\u0010`\u0007\u0002u*\u00111\u0010M\u0001\u0006kRLGn]\u0005\u0003{j\u0014A\u0001V5nK\")qP\u0005a\u0001\u000f\u0006A!M]8lKJLE-\u0001\u0016uKN$H)\u001f8b[&\u001c7i\u001c8oK\u000e$\u0018n\u001c8SCR,G+\u001e8j]\u001e\u0014VmY8oM&<WO]3\u0015\u00031D3aEA\u0004!\u0011\tI!a\u0006\u000e\u0005\u0005-!\u0002BA\u0007\u0003\u001f\t1!\u00199j\u0015\u0011\t\t\"a\u0005\u0002\u000f),\b/\u001b;fe*\u0019\u0011QC\u001b\u0002\u000b),h.\u001b;\n\t\u0005e\u00111\u0002\u0002\u0005)\u0016\u001cH/\u0001\u0014uKN$H)\u001f8b[&\u001c7i\u001c8oK\u000e$\u0018n\u001c8SCR,G+\u001e8j]\u001e$\u0015n]1cY\u0016D3\u0001FA\u0004\u0003Y1XM]5gs\u000e{gNZ5h!J|\u0007/Y4bi\u0016$G#\u00027\u0002$\u0005m\u0002bBA\u0013+\u0001\u0007\u0011qE\u0001\u0005aJ|\u0007\u000f\u0005\u0003\u0002*\u0005]b\u0002BA\u0016\u0003g\u00012!!\fJ\u001b\t\tyCC\u0002\u00022}\ta\u0001\u0010:p_Rt\u0014bAA\u001b\u0013\u00061\u0001K]3eK\u001aL1aZA\u001d\u0015\r\t)$\u0013\u0005\b\u0003{)\u0002\u0019AA\u0014\u0003\u00151\u0018\r\\;f\u0003-\tG\u000e^3s\u0007>tg-[4\u0015\u000b1\f\u0019%!\u0012\t\u000f\u0005\u0015b\u00031\u0001\u0002(!9\u0011Q\b\fA\u0002\u0005\u001d\u0012AI<bSR4uN\u001d'jgR,g.\u001a:D_:tWm\u0019;j_:\u0014\u0016\r^3Rk>$\u0018\rF\u0004m\u0003\u0017\nI&!\u0018\t\u000f\u00055s\u00031\u0001\u0002P\u00059Q.\u001a;sS\u000e\u001c\b\u0003BA)\u0003+j!!a\u0015\u000b\u0007\u00055\u0003'\u0003\u0003\u0002X\u0005M#aB'fiJL7m\u001d\u0005\u0007\u00037:\u0002\u0019A$\u0002%\u0015D\b/Z2uK\u0012\fVo\u001c;b\u0019&l\u0017\u000e\u001e\u0005\t\u0003?:\u0002\u0013!a\u00019\u0006IQ.\u0019=XC&$Xj]\u0001-o\u0006LGOR8s\u0019&\u001cH/\u001a8fe\u000e{gN\\3di&|gNU1uKF+x\u000e^1%I\u00164\u0017-\u001e7uIM*\"!!\u001a+\u0007q\u000b9g\u000b\u0002\u0002jA!\u00111NA;\u001b\t\tiG\u0003\u0003\u0002p\u0005E\u0014!C;oG\",7m[3e\u0015\r\t\u0019(S\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA<\u0003[\u0012\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0003y1wN]2f\u0007>tg.Z2uS>t'+\u0019;f)Vt\u0017N\\4DQ\u0016\u001c7\u000e")
public class ConfluentConnectionQuotasReconfigurationTest
extends BaseRequestTest {
    private final ListenerName listener = ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT);
    private final InetAddress localAddress = InetAddress.getByName("127.0.0.1");
    private final int plaintextListenerDefaultQuota;
    private final int plaintextListenerDefaultFloor;
    private final MockTime time = new MockTime();
    private final long sleepMs = ConnectionQuotas$.MODULE$.ConnectionBackpressureCheckIntervalMs() + 1L;
    private final String connectionRateFloorProp = new StringBuilder(0).append(this.listener().configPrefix()).append(KafkaConfig$.MODULE$.MaxConnectionCreationRateFloorProp()).toString();

    public ListenerName listener() {
        return this.listener;
    }

    public InetAddress localAddress() {
        return this.localAddress;
    }

    public int plaintextListenerDefaultQuota() {
        return this.plaintextListenerDefaultQuota;
    }

    public int plaintextListenerDefaultFloor() {
        return this.plaintextListenerDefaultFloor;
    }

    public MockTime time() {
        return this.time;
    }

    public long sleepMs() {
        return this.sleepMs;
    }

    public String connectionRateFloorProp() {
        return this.connectionRateFloorProp;
    }

    @Override
    public int brokerCount() {
        return 1;
    }

    @Override
    public void brokerPropertyOverrides(Properties properties) {
        properties.put(new StringBuilder(0).append(this.listener().configPrefix()).append(KafkaConfig$.MODULE$.MaxConnectionCreationRateProp()).toString(), Integer.toString(this.plaintextListenerDefaultQuota()));
        properties.put(this.connectionRateFloorProp(), Integer.toString(this.plaintextListenerDefaultFloor()));
    }

    @Override
    public Time brokerTime(int brokerId) {
        return this.time();
    }

    @Test
    public void testDynamicConnectionRateTuningReconfigure() {
        int updatedFloor = 20;
        this.forceConnectionRateTuningCheck();
        this.waitForListenerConnectionRateQuota(((KafkaServer)this.servers().head()).metrics(), this.plaintextListenerDefaultFloor(), 15000L);
        this.forceConnectionRateTuningCheck();
        Assertions.assertThrows(AssertionError.class, () -> this.waitForListenerConnectionRateQuota(((KafkaServer)this.servers().head()).metrics(), 20, TimeUnit.SECONDS.toMillis(1L)));
        this.alterConfig(this.connectionRateFloorProp(), Integer.toString(updatedFloor));
        this.forceConnectionRateTuningCheck();
        this.waitForListenerConnectionRateQuota(((KafkaServer)this.servers().head()).metrics(), 25, 15000L);
        this.forceConnectionRateTuningCheck();
        this.waitForListenerConnectionRateQuota(((KafkaServer)this.servers().head()).metrics(), updatedFloor, 15000L);
    }

    @Test
    public void testDynamicConnectionRateTuningDisable() {
        this.forceConnectionRateTuningCheck();
        this.waitForListenerConnectionRateQuota(((KafkaServer)this.servers().head()).metrics(), 25, 15000L);
        this.alterConfig(this.connectionRateFloorProp(), Integer.toString(this.plaintextListenerDefaultQuota()));
        this.forceConnectionRateTuningCheck();
        Assertions.assertThrows(AssertionError.class, () -> this.waitForListenerConnectionRateQuota(((KafkaServer)this.servers().head()).metrics(), 25, TimeUnit.SECONDS.toMillis(1L)));
    }

    /*
     * WARNING - void declaration
     */
    private void verifyConfigPropagated(String prop, String value) {
        long l = 15000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l2 = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                ConfluentConnectionQuotasReconfigurationTest.$anonfun$verifyConfigPropagated$1(this, value, prop);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l2).append(", and then retrying.").toString();
                    Object var9_8 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l2);
                l2 += package$.MODULE$.min(l2, 1000L);
                continue;
            }
            break;
        }
    }

    private void alterConfig(String prop, String value) {
        Properties props = new Properties();
        props.put(prop, value);
        ConfluentAdmin adminClient = this.createConfluentAdminClient(this.createConfluentAdminClient$default$1());
        TestUtils$.MODULE$.incrementalAlterConfigs(this.servers(), (Admin)adminClient, props, false, AlterConfigOp.OpType.SET).all().get();
        this.verifyConfigPropagated(prop, value);
        adminClient.close();
    }

    /*
     * WARNING - void declaration
     */
    private void waitForListenerConnectionRateQuota(Metrics metrics, int expectedQuotaLimit, long maxWaitMs) {
        long l = 15000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l2 = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                ConfluentConnectionQuotasReconfigurationTest.$anonfun$waitForListenerConnectionRateQuota$1(this, metrics, expectedQuotaLimit);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l2).append(", and then retrying.").toString();
                    Object var11_9 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l2);
                l2 += package$.MODULE$.min(l2, 1000L);
                continue;
            }
            break;
        }
    }

    private long waitForListenerConnectionRateQuota$default$3() {
        return 15000L;
    }

    private void forceConnectionRateTuningCheck() {
        DataPlaneAcceptor acceptor = (DataPlaneAcceptor)((Tuple2)CollectionConverters$.MODULE$.ConcurrentMapHasAsScala((ConcurrentMap)((KafkaServer)this.servers().head()).socketServer().dataPlaneAcceptors()).asScala().head())._2();
        this.time().sleep(this.sleepMs());
        acceptor.processorQueueSizeSensor().record(16.0, this.time().milliseconds());
        acceptor.wakeup();
    }

    public static final /* synthetic */ void $anonfun$verifyConfigPropagated$1(ConfluentConnectionQuotasReconfigurationTest $this, String value$1, String prop$1) {
        Assertions.assertEquals((Object)value$1, ((KafkaServer)$this.servers().head()).config().originals().get(prop$1));
    }

    public static final /* synthetic */ void $anonfun$waitForListenerConnectionRateQuota$1(ConfluentConnectionQuotasReconfigurationTest $this, Metrics metrics$1, int expectedQuotaLimit$1) {
        KafkaMetric rateMetric = metrics$1.metric(metrics$1.metricName("connection-tokens", SocketServer$.MODULE$.MetricsGroup(), Collections.singletonMap(Processor$.MODULE$.ListenerMetricTag(), $this.listener().value())));
        KafkaMetric rateLimitMetric = metrics$1.metric(metrics$1.metricName("connection-accept-limit", SocketServer$.MODULE$.MetricsGroup(), Collections.singletonMap(Processor$.MODULE$.ListenerMetricTag(), $this.listener().value())));
        Assertions.assertEquals((double)expectedQuotaLimit$1, (double)rateMetric.config().quota().bound(), (double)0.01);
        Assertions.assertEquals((double)expectedQuotaLimit$1, (double)BoxesRunTime.unboxToDouble((Object)rateLimitMetric.metricValue()), (double)0.01);
    }

    public ConfluentConnectionQuotasReconfigurationTest() {
        this.plaintextListenerDefaultQuota = 30;
        this.plaintextListenerDefaultFloor = 25;
    }
}

