package kafka.zk;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import kafka.server.ConfigType$;
import kafka.server.ControllerServer;
import kafka.server.KafkaConfig$;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.test.junit.ZkClusterInvocationContext;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterClientQuotasResult;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ZkMigrationIntegrationTest.scala */
@ExtendWith({ClusterTestExtensions.class})
@ScalaSignature(bytes = "\u0006\u0005\u0005=h\u0001\u0002\f\u0018\u0001qAQa\t\u0001\u0005\u0002\u0011Bqa\n\u0001C\u0002\u0013\u0005\u0001\u0006\u0003\u00042\u0001\u0001\u0006I!\u000b\u0004\u0005e\u0001\u00011\u0007C\u0003$\t\u0011\u0005A\u0007C\u00048\t\t\u0007I\u0011\u0001\u001d\t\r\t#\u0001\u0015!\u0003:\u0011\u001d\u0019E\u00011A\u0005\u0002\u0011Cq\u0001\u0013\u0003A\u0002\u0013\u0005\u0011\n\u0003\u0004P\t\u0001\u0006K!\u0012\u0005\u0006!\u0012!\t!\u0015\u0005\u0006I\u0012!\t!\u001a\u0005\u0006]\u0002!\ta\u001c\u0005\b\u00037\u0001A\u0011AA\u000f\u0011\u001d\t9\u0006\u0001C\u0001\u00033Bq!!\u001e\u0001\t\u0003\t9\bC\u0004\u0002\n\u0002!\t!a#\t\u000f\u0005\u0015\u0006\u0001\"\u0001\u0002(\"9\u0011\u0011\u0017\u0001\u0005\u0002\u0005M\u0006bBA\\\u0001\u0011\u0005\u0011\u0011\u0018\u0005\b\u0003{\u0003A\u0011AA`\u0005iQ6.T5he\u0006$\u0018n\u001c8J]R,wM]1uS>tG+Z:u\u0015\tA\u0012$\u0001\u0002{W*\t!$A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001i\u0002C\u0001\u0010\"\u001b\u0005y\"\"\u0001\u0011\u0002\u000bM\u001c\u0017\r\\1\n\u0005\tz\"AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002KA\u0011a\u0005A\u0007\u0002/\u0005\u0019An\\4\u0016\u0003%\u0002\"AK\u0018\u000e\u0003-R!\u0001L\u0017\u0002\u000bMdg\r\u000e6\u000b\u00039\n1a\u001c:h\u0013\t\u00014F\u0001\u0004M_\u001e<WM]\u0001\u0005Y><\u0007EA\u000bNKR\fG-\u0019;b\t\u0016dG/\u0019,fe&4\u0017.\u001a:\u0014\u0005\u0011iB#A\u001b\u0011\u0005Y\"Q\"\u0001\u0001\u0002\u001b5,G/\u00193bi\u0006$U\r\u001c;b+\u0005I\u0004C\u0001\u001eA\u001b\u0005Y$B\u0001\u001f>\u0003\u0015IW.Y4f\u0015\tQbH\u0003\u0002@[\u00051\u0011\r]1dQ\u0016L!!Q\u001e\u0003\u001b5+G/\u00193bi\u0006$U\r\u001c;b\u00039iW\r^1eCR\fG)\u001a7uC\u0002\naa\u001c4gg\u0016$X#A#\u0011\u0005y1\u0015BA$ \u0005\rIe\u000e^\u0001\u000b_\u001a47/\u001a;`I\u0015\fHC\u0001&N!\tq2*\u0003\u0002M?\t!QK\\5u\u0011\u001dq\u0015\"!AA\u0002\u0015\u000b1\u0001\u001f\u00132\u0003\u001dygMZ:fi\u0002\na!Y2dKB$HC\u0001&S\u0011\u0015\u00196\u00021\u0001U\u0003\u0015\u0011\u0017\r^2i!\r)&\fX\u0007\u0002-*\u0011q\u000bW\u0001\u0005kRLGNC\u0001Z\u0003\u0011Q\u0017M^1\n\u0005m3&\u0001\u0002'jgR\u0004\"!\u00182\u000e\u0003yS!a\u00181\u0002\r\r|W.\\8o\u0015\t\tW(\u0001\u0004tKJ4XM]\u0005\u0003Gz\u0013A#\u00119j\u001b\u0016\u001c8/Y4f\u0003:$g+\u001a:tS>t\u0017A\u0002<fe&4\u0017\u0010\u0006\u0002KM\")q\r\u0004a\u0001Q\u0006Aa/\u001a:jM&,'\u000f\u0005\u0003\u001fS.T\u0015B\u00016 \u0005%1UO\\2uS>t\u0017\u0007\u0005\u0002;Y&\u0011Qn\u000f\u0002\u000e\u001b\u0016$\u0018\rZ1uC&k\u0017mZ3\u0002\u0017Q,7\u000f^'jOJ\fG/\u001a\u000b\u0003\u0015BDQ!]\u0007A\u0002I\fqb\u00197vgR,'/\u00138ti\u0006t7-\u001a\t\u0003gZl\u0011\u0001\u001e\u0006\u0003kf\tA\u0001^3ti&\u0011q\u000f\u001e\u0002\u0010\u00072,8\u000f^3s\u0013:\u001cH/\u00198dK\"jQ\"_@\u0002\u0002\u0005\r\u0011QAA\b\u0003#\u0001\"A_?\u000e\u0003mT!\u0001 ;\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0002\u007fw\nY1\t\\;ti\u0016\u0014H+Z:u\u0003\u001d\u0011'o\\6feNl\u0012aA\u0001\fG2,8\u000f^3s)f\u0004X\r\n\u0002\u0002\b%!\u0011\u0011BA\u0006\u0003\tQ6JC\u0002\u0002\u000em\fA\u0001V=qK\u0006yQ.\u001a;bI\u0006$\u0018MV3sg&|g\u000e\n\u0002\u0002\u0014%!\u0011QCA\f\u0003-I%\tU04?Rz\u0016J\u0016\u0019\u000b\u0007\u0005ea,A\bNKR\fG-\u0019;b-\u0016\u00148/[8o\u00035!Xm\u001d;Ek\u0006dwK]5uKR\u0019!*a\b\t\r\u0005\u0005b\u00021\u0001s\u0003%Q8n\u00117vgR,'\u000fK\t\u000fs\u0006\r\u0011QA@\u0002\u0002\u0005=\u0011\u0011CA\u0013\u0003O\t\u0001c]3sm\u0016\u0014\bK]8qKJ$\u0018.Z:-\u0011\u0005%\u0012QHA$\u0003\u001bZ#\"a\u000b\u00022\u0005M\u0012qGA\u001d!\rQ\u0018QF\u0005\u0004\u0003_Y(!F\"mkN$XM]\"p]\u001aLw\r\u0015:pa\u0016\u0014H/_\u0001\u0004W\u0016L\u0018EAA\u001b\u0003iIg\u000e^3s]\t\u0014xn[3s]1L7\u000f^3oKJtc.Y7f\u0003\u00151\u0018\r\\;fC\t\tY$\u0001\u0005F1R+%KT!MW)\tY#!\r\u0002@\u0005]\u00121I\u0011\u0003\u0003\u0003\n\u0011\u0002\\5ti\u0016tWM]:\"\u0005\u0005\u0015\u0013A\f)M\u0003&sE+\u0012-Uu=zCn\\2bY\"|7\u000f\u001e\u001e1Y\u0015CF+\u0012*O\u00032Stf\f7pG\u0006d\u0007n\\:uuAZ#\"a\u000b\u00022\u0005%\u0013qGA\"C\t\tY%\u0001\u000bbIZ,'\u000f^5tK\u0012tC.[:uK:,'o]\u0016\u000b\u0003W\t\t$a\u0014\u00028\u0005M\u0013EAA)\u0003ya\u0017n\u001d;f]\u0016\u0014hf]3dkJLG/\u001f\u0018qe>$xnY8m]5\f\u0007/\t\u0002\u0002V\u00051S\t\u0017+F%:\u000bEJ\u000f)M\u0003&sE+\u0012-UYAc\u0015)\u0013(U\u000bb#&\b\u0015'B\u0013:#V\t\u0017+\u0002%\u0005dGn\\2bi\u0016\u0004&o\u001c3vG\u0016\u0014\u0018\n\u001a\u000b\u0004\u0015\u0006m\u0003bBA/\u001f\u0001\u0007\u0011qL\u0001\u0011E>|Go\u001d;sCB\u001cVM\u001d<feN\u0004B!!\u0019\u0002p9!\u00111MA6!\r\t)gH\u0007\u0003\u0003OR1!!\u001b\u001c\u0003\u0019a$o\\8u}%\u0019\u0011QN\u0010\u0002\rA\u0013X\rZ3g\u0013\u0011\t\t(a\u001d\u0003\rM#(/\u001b8h\u0015\r\tigH\u0001\u0014e\u0016\fG\r\u0015:pIV\u001cWM]%e\u00052|7m\u001b\u000b\u0005\u0003s\ny\bE\u0002^\u0003wJ1!! _\u0005A\u0001&o\u001c3vG\u0016\u0014\u0018\nZ:CY>\u001c7\u000eC\u0004\u0002\u0002B\u0001\r!a!\u0002\u0011i\\7\t\\5f]R\u00042AJAC\u0013\r\t9i\u0006\u0002\u000e\u0017\u000647.\u0019.l\u00072LWM\u001c;\u0002!\u0005dG/\u001a:U_BL7mQ8oM&<G\u0003BAG\u0003;\u0003B!a$\u0002\u001a6\u0011\u0011\u0011\u0013\u0006\u0005\u0003'\u000b)*A\u0003bI6LgNC\u0002\u0002\u0018v\nqa\u00197jK:$8/\u0003\u0003\u0002\u001c\u0006E%AE!mi\u0016\u00148i\u001c8gS\u001e\u001c(+Z:vYRDq!a%\u0012\u0001\u0004\ty\n\u0005\u0003\u0002\u0010\u0006\u0005\u0016\u0002BAR\u0003#\u0013Q!\u00113nS:\f\u0011#\u00197uKJ\u001cE.[3oiF+x\u000e^1t)\u0011\tI+a,\u0011\t\u0005=\u00151V\u0005\u0005\u0003[\u000b\tJA\fBYR,'o\u00117jK:$\u0018+^8uCN\u0014Vm];mi\"9\u00111\u0013\nA\u0002\u0005}\u0015A\u0005<fe&4\u0017\u0010V8qS\u000e\u001cuN\u001c4jON$2ASA[\u0011\u001d\t\ti\u0005a\u0001\u0003\u0007\u000b!C^3sS\u001aL8\t\\5f]R\fVo\u001c;bgR\u0019!*a/\t\u000f\u0005\u0005E\u00031\u0001\u0002\u0004\u0006\u0001b/\u001a:jMf\u0004&o\u001c3vG\u0016\u0014\u0018\n\u001a\u000b\u0006\u0015\u0006\u0005\u0017Q\u0019\u0005\b\u0003\u0007,\u0002\u0019AA=\u0003Q1\u0017N]:u!J|G-^2fe&#'\t\\8dW\"9\u0011\u0011Q\u000bA\u0002\u0005\r\u0005f\u0002\u0001\u0002J\u0006]\u0012\u0011\u001d\t\u0005\u0003\u0017\fi.\u0004\u0002\u0002N*!\u0011qZAi\u0003%)\u0007\u0010^3og&|gN\u0003\u0003\u0002T\u0006U\u0017aA1qS*!\u0011q[Am\u0003\u001dQW\u000f]5uKJT1!a7.\u0003\u0015QWO\\5u\u0013\u0011\ty.!4\u0003\u0015\u0015CH/\u001a8e/&$\b\u000e\f\u0002\u0002d\u000e\u0012\u0011Q\u001d\t\u0005\u0003O\fY/\u0004\u0002\u0002j*\u0019\u00111\u001c;\n\t\u00055\u0018\u0011\u001e\u0002\u0016\u00072,8\u000f^3s)\u0016\u001cH/\u0012=uK:\u001c\u0018n\u001c8t\u0001")
/* loaded from: input_file:kafka/zk/ZkMigrationIntegrationTest.class */
public class ZkMigrationIntegrationTest {
    private final Logger log = LoggerFactory.getLogger(ZkMigrationIntegrationTest.class);

    /* compiled from: ZkMigrationIntegrationTest.scala */
    /* loaded from: input_file:kafka/zk/ZkMigrationIntegrationTest$MetadataDeltaVerifier.class */
    public class MetadataDeltaVerifier {
        private final MetadataDelta metadataDelta;
        private int offset;
        public final /* synthetic */ ZkMigrationIntegrationTest $outer;

        public MetadataDelta metadataDelta() {
            return this.metadataDelta;
        }

        public int offset() {
            return this.offset;
        }

        public void offset_$eq(int i) {
            this.offset = i;
        }

        public void accept(List<ApiMessageAndVersion> list) {
            list.forEach(apiMessageAndVersion -> {
                this.metadataDelta().replay(apiMessageAndVersion.message());
                this.offset_$eq(this.offset() + 1);
            });
        }

        public void verify(Function1<MetadataImage, BoxedUnit> function1) {
            function1.apply(metadataDelta().apply(new MetadataProvenance(offset(), 0, 0L)));
        }

        public /* synthetic */ ZkMigrationIntegrationTest kafka$zk$ZkMigrationIntegrationTest$MetadataDeltaVerifier$$$outer() {
            return this.$outer;
        }

        public MetadataDeltaVerifier(ZkMigrationIntegrationTest zkMigrationIntegrationTest) {
            if (zkMigrationIntegrationTest == null) {
                throw null;
            }
            this.$outer = zkMigrationIntegrationTest;
            this.metadataDelta = new MetadataDelta(MetadataImage.EMPTY);
            this.offset = 0;
        }
    }

    public Logger log() {
        return this.log;
    }

    @ClusterTest(brokers = 3, clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_4_IV0)
    public void testMigrate(ClusterInstance clusterInstance) {
        Admin createAdminClient = clusterInstance.createAdminClient();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new NewTopic("test-topic-1", 2, (short) 3).configs(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("segment.bytes"), "102400"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("segment.ms"), "300000")}))).asJava()));
        arrayList.add(new NewTopic("test-topic-2", 1, (short) 3));
        arrayList.add(new NewTopic("test-topic-3", 10, (short) 3));
        createAdminClient.createTopics(arrayList).all().get(60L, TimeUnit.SECONDS);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new ClientQuotaAlteration(new ClientQuotaEntity(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), "user1")}))).asJava()), CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(new ClientQuotaAlteration.Op("consumer_byte_rate", Predef$.MODULE$.double2Double(1000.0d)), Nil$.MODULE$)).asJava()));
        arrayList2.add(new ClientQuotaAlteration(new ClientQuotaEntity(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), "user1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("client-id"), "clientA")}))).asJava()), CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(new ClientQuotaAlteration.Op("consumer_byte_rate", Predef$.MODULE$.double2Double(800.0d)), new $colon.colon(new ClientQuotaAlteration.Op("producer_byte_rate", Predef$.MODULE$.double2Double(100.0d)), Nil$.MODULE$))).asJava()));
        arrayList2.add(new ClientQuotaAlteration(new ClientQuotaEntity(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("ip"), "8.8.8.8")}))).asJava()), CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(new ClientQuotaAlteration.Op("connection_creation_rate", Predef$.MODULE$.double2Double(10.0d)), Nil$.MODULE$)).asJava()));
        createAdminClient.alterClientQuotas(arrayList2);
        ZkMigrationClient zkMigrationClient = new ZkMigrationClient(((ZkClusterInvocationContext.ZkClusterInstance) clusterInstance).getUnderlying().zkClient());
        ZkMigrationLeadershipState claimControllerLeadership = zkMigrationClient.claimControllerLeadership(zkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY).withNewKRaftController(3000, 42));
        HashSet hashSet = new HashSet();
        MetadataDeltaVerifier metadataDeltaVerifier = new MetadataDeltaVerifier(this);
        zkMigrationClient.readAllMetadata(list -> {
            metadataDeltaVerifier.accept(list);
        }, num -> {
            hashSet.add(num);
        });
        Assertions.assertEquals(Seq$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 1, 2})), CollectionConverters$.MODULE$.SetHasAsScala(hashSet).asScala().toSeq());
        metadataDeltaVerifier.verify(metadataImage -> {
            $anonfun$testMigrate$3(metadataImage);
            return BoxedUnit.UNIT;
        });
        zkMigrationClient.releaseControllerLeadership(claimControllerLeadership);
    }

    @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = {@ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), @ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), @ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), @ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")})
    public void testDualWrite(ClusterInstance clusterInstance) {
        Admin createAdminClient = clusterInstance.createAdminClient();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new NewTopic("test", 2, (short) 3).configs(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("segment.bytes"), "102400"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("segment.ms"), "300000")}))).asJava()));
        createAdminClient.createTopics(arrayList).all().get(60L, TimeUnit.SECONDS);
        createAdminClient.close();
        KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) clusterInstance).getUnderlying().zkClient();
        Properties entityConfigs = zkClient.getEntityConfigs(ConfigType$.MODULE$.Topic(), "test");
        Assertions.assertEquals("102400", entityConfigs.getProperty("segment.bytes"));
        Assertions.assertEquals("300000", entityConfigs.getProperty("segment.ms"));
        KafkaClusterTestKit build = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).setClusterId(Uuid.fromString(clusterInstance.clusterId())).setNumBrokerNodes(0).setNumControllerNodes(1).build()).setConfigProp(KafkaConfig$.MODULE$.MigrationEnabledProp(), "true").setConfigProp(KafkaConfig$.MODULE$.ZkConnectProp(), ((ZkClusterInvocationContext.ZkClusterInstance) clusterInstance).getUnderlying().zkConnect()).build();
        try {
            build.format();
            build.startup();
            CompletableFuture waitForReadyBrokers = ((ControllerServer) CollectionConverters$.MODULE$.CollectionHasAsScala(build.controllers().values()).asScala().head()).controller().waitForReadyBrokers(3);
            allocateProducerId(clusterInstance.bootstrapServers());
            ProducerIdsBlock readProducerIdBlock = readProducerIdBlock(zkClient);
            log().info("Restart brokers in migration mode");
            Object obj = build.controllerClientProperties().get("controller.quorum.voters");
            clusterInstance.config().serverProperties().put(KafkaConfig$.MODULE$.MigrationEnabledProp(), "true");
            clusterInstance.config().serverProperties().put("controller.quorum.voters", obj);
            clusterInstance.config().serverProperties().put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER");
            clusterInstance.config().serverProperties().put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT");
            clusterInstance.rollingBrokerRestart();
            clusterInstance.waitForReadyBrokers();
            waitForReadyBrokers.get(30L, TimeUnit.SECONDS);
            log().info("Waiting for ZK migration to begin");
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            TestUtils$ testUtils$3 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testDualWrite$1(zkClient)) {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Assertions.fail("Timed out waiting for KRaft controller to take over");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            log().info("Updating metadata with AdminClient");
            Admin createAdminClient2 = clusterInstance.createAdminClient();
            alterTopicConfig(createAdminClient2).all().get(60L, TimeUnit.SECONDS);
            alterClientQuotas(createAdminClient2).all().get(60L, TimeUnit.SECONDS);
            log().info("Verifying metadata changes with ZK");
            verifyTopicConfigs(zkClient);
            verifyClientQuotas(zkClient);
            allocateProducerId(clusterInstance.bootstrapServers());
            verifyProducerId(readProducerIdBlock, zkClient);
        } finally {
            clusterInstance.stop();
            build.close();
        }
    }

    public void allocateProducerId(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("transactional.id", "some-transaction-id");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        kafkaProducer.send(new ProducerRecord("test", "", "one"));
        kafkaProducer.commitTransaction();
        kafkaProducer.flush();
        kafkaProducer.close();
    }

    public ProducerIdsBlock readProducerIdBlock(KafkaZkClient kafkaZkClient) {
        ProducerIdBlockZNode$ producerIdBlockZNode$ = ProducerIdBlockZNode$.MODULE$;
        Tuple2 dataAndVersion = kafkaZkClient.getDataAndVersion("/latest_producer_id_block");
        if (dataAndVersion == null) {
            throw new MatchError((Object) null);
        }
        return (ProducerIdsBlock) ((Option) dataAndVersion._1()).map(bArr -> {
            return ProducerIdBlockZNode$.MODULE$.parseProducerIdBlockData(bArr);
        }).get();
    }

    public AlterConfigsResult alterTopicConfig(Admin admin) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test");
        return admin.incrementalAlterConfigs(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.IterableHasAsJava(new $colon.colon(new AlterConfigOp(new ConfigEntry("segment.bytes", "204800"), AlterConfigOp.OpType.SET), new $colon.colon(new AlterConfigOp(new ConfigEntry("segment.ms", (String) null), AlterConfigOp.OpType.DELETE), Nil$.MODULE$))).asJavaCollection())}))).asJava());
    }

    public AlterClientQuotasResult alterClientQuotas(Admin admin) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ClientQuotaAlteration(new ClientQuotaEntity(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), "user1")}))).asJava()), CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(new ClientQuotaAlteration.Op("consumer_byte_rate", Predef$.MODULE$.double2Double(1000.0d)), Nil$.MODULE$)).asJava()));
        arrayList.add(new ClientQuotaAlteration(new ClientQuotaEntity(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), "user1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("client-id"), "clientA")}))).asJava()), CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(new ClientQuotaAlteration.Op("consumer_byte_rate", Predef$.MODULE$.double2Double(800.0d)), new $colon.colon(new ClientQuotaAlteration.Op("producer_byte_rate", Predef$.MODULE$.double2Double(100.0d)), Nil$.MODULE$))).asJava()));
        arrayList.add(new ClientQuotaAlteration(new ClientQuotaEntity(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("ip"), "8.8.8.8")}))).asJava()), CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(new ClientQuotaAlteration.Op("connection_creation_rate", Predef$.MODULE$.double2Double(10.0d)), Nil$.MODULE$)).asJava()));
        return admin.alterClientQuotas(arrayList);
    }

    public void verifyTopicConfigs(KafkaZkClient kafkaZkClient) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$verifyTopicConfigs$1(kafkaZkClient);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 10000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    public void verifyClientQuotas(KafkaZkClient kafkaZkClient) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$verifyClientQuotas$1(kafkaZkClient);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 10000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    public void verifyProducerId(ProducerIdsBlock producerIdsBlock, KafkaZkClient kafkaZkClient) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$verifyProducerId$1(this, kafkaZkClient, producerIdsBlock);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 10000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    public static final /* synthetic */ void $anonfun$testMigrate$3(MetadataImage metadataImage) {
        Assertions.assertNotNull(metadataImage.topics().getTopic("test-topic-1"));
        Assertions.assertEquals(2, metadataImage.topics().getTopic("test-topic-1").partitions().size());
        Assertions.assertNotNull(metadataImage.topics().getTopic("test-topic-2"));
        Assertions.assertEquals(1, metadataImage.topics().getTopic("test-topic-2").partitions().size());
        Assertions.assertNotNull(metadataImage.topics().getTopic("test-topic-3"));
        Assertions.assertEquals(10, metadataImage.topics().getTopic("test-topic-3").partitions().size());
        Assertions.assertEquals(3, metadataImage.clientQuotas().entities().size());
    }

    public static final /* synthetic */ boolean $anonfun$testDualWrite$1(KafkaZkClient kafkaZkClient) {
        return kafkaZkClient.getControllerId().contains(BoxesRunTime.boxToInteger(3000));
    }

    public static final /* synthetic */ String $anonfun$testDualWrite$2() {
        return "Timed out waiting for KRaft controller to take over";
    }

    public static final /* synthetic */ void $anonfun$verifyTopicConfigs$1(KafkaZkClient kafkaZkClient) {
        Properties entityConfigs = kafkaZkClient.getEntityConfigs(ConfigType$.MODULE$.Topic(), "test");
        Assertions.assertEquals("204800", entityConfigs.getProperty("segment.bytes"));
        Assertions.assertFalse(entityConfigs.containsKey("segment.ms"));
    }

    public static final /* synthetic */ void $anonfun$verifyClientQuotas$1(KafkaZkClient kafkaZkClient) {
        Assertions.assertEquals("1000.0", kafkaZkClient.getEntityConfigs(ConfigType$.MODULE$.User(), "user1").getProperty("consumer_byte_rate"));
        Assertions.assertEquals("800.0", kafkaZkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"));
        Assertions.assertEquals("100.0", kafkaZkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"));
        Assertions.assertEquals("10.0", kafkaZkClient.getEntityConfigs(ConfigType$.MODULE$.Ip(), "8.8.8.8").getProperty("connection_creation_rate"));
    }

    public static final /* synthetic */ void $anonfun$verifyProducerId$1(ZkMigrationIntegrationTest zkMigrationIntegrationTest, KafkaZkClient kafkaZkClient, ProducerIdsBlock producerIdsBlock) {
        Assertions.assertTrue(producerIdsBlock.firstProducerId() < zkMigrationIntegrationTest.readProducerIdBlock(kafkaZkClient).firstProducerId());
    }
}
