package kafka.server;

import java.io.File;
import java.net.Socket;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import scala.$less$colon$less$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: FetchFromFollowerIntegrationTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u001dc\u0001\u0002\n\u0014\u0001aAQ!\b\u0001\u0005\u0002yAq\u0001\t\u0001C\u0002\u0013\u0005\u0011\u0005\u0003\u0004)\u0001\u0001\u0006IA\t\u0005\bS\u0001\u0011\r\u0011\"\u0001\"\u0011\u0019Q\u0003\u0001)A\u0005E!91\u0006\u0001b\u0001\n\u0003a\u0003BB\u001b\u0001A\u0003%Q\u0006C\u00047\u0001\t\u0007I\u0011A\u0011\t\r]\u0002\u0001\u0015!\u0003#\u0011\u001dA\u0004A1A\u0005\u0002\u0005Ba!\u000f\u0001!\u0002\u0013\u0011\u0003\"\u0002\u001e\u0001\t\u0003Y\u0004\"\u0002\"\u0001\t\u0003\u001a\u0005\"B'\u0001\t\u0003q\u0005bBA\u0004\u0001\u0011\u0005\u0011\u0011\u0002\u0005\b\u0003O\u0001A\u0011AA\u0015\u0011\u001d\t\u0019\u0004\u0001C\u0001\u0003k\u0011\u0001ER3uG\"4%o\\7G_2dwn^3s\u0013:$Xm\u001a:bi&|g\u000eV3ti*\u0011A#F\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003Y\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u00013A\u0011!dG\u0007\u0002'%\u0011Ad\u0005\u0002\u0015\u0005\u0006\u001cXMR3uG\"\u0014V-];fgR$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005y\u0002C\u0001\u000e\u0001\u0003!qW/\u001c(pI\u0016\u001cX#\u0001\u0012\u0011\u0005\r2S\"\u0001\u0013\u000b\u0003\u0015\nQa]2bY\u0006L!a\n\u0013\u0003\u0007%sG/A\u0005ok6tu\u000eZ3tA\u0005Aa.^7QCJ$8/A\u0005ok6\u0004\u0016M\u001d;tA\u0005)Ao\u001c9jGV\tQ\u0006\u0005\u0002/g5\tqF\u0003\u00021c\u0005!A.\u00198h\u0015\u0005\u0011\u0014\u0001\u00026bm\u0006L!\u0001N\u0018\u0003\rM#(/\u001b8h\u0003\u0019!x\u000e]5dA\u0005qA.Z1eKJ\u0014%o\\6fe&#\u0017a\u00047fC\u0012,'O\u0011:pW\u0016\u0014\u0018\n\u001a\u0011\u0002!\u0019|G\u000e\\8xKJ\u0014%o\\6fe&#\u0017!\u00054pY2|w/\u001a:Ce>\\WM]%eA\u0005yqN^3se&$\u0017N\\4Qe>\u00048/F\u0001=!\ti\u0004)D\u0001?\u0015\ty\u0014'\u0001\u0003vi&d\u0017BA!?\u0005)\u0001&o\u001c9feRLWm]\u0001\u0010O\u0016tWM]1uK\u000e{gNZ5hgV\tA\tE\u0002F\u0011*k\u0011A\u0012\u0006\u0003\u000f\u0012\n!bY8mY\u0016\u001cG/[8o\u0013\tIeIA\u0002TKF\u0004\"AG&\n\u00051\u001b\"aC&bM.\f7i\u001c8gS\u001e\fq\u0006^3ti\u001a{G\u000e\\8xKJ\u001cu.\u001c9mKR,G)\u001a7bs\u0016$g)\u001a;dQ\u0016\u001cxJ\u001c*fa2L7-\u0019;j_:$2a\u0014*_!\t\u0019\u0003+\u0003\u0002RI\t!QK\\5u\u0011\u0015\u0019f\u00021\u0001U\u0003\u0019\tXo\u001c:v[B\u0011Q\u000b\u0018\b\u0003-j\u0003\"a\u0016\u0013\u000e\u0003aS!!W\f\u0002\rq\u0012xn\u001c;?\u0013\tYF%\u0001\u0004Qe\u0016$WMZ\u0005\u0003iuS!a\u0017\u0013\t\u000b}s\u0001\u0019\u0001+\u0002\u001fI,\u0007\u000f\\5dCRLwN\\'pI\u0016DCAD1n]B\u0011!m[\u0007\u0002G*\u0011A-Z\u0001\u0007a\u0006\u0014\u0018-\\:\u000b\u0005\u0019<\u0017a\u00026va&$XM\u001d\u0006\u0003Q&\fQA[;oSRT\u0011A[\u0001\u0004_J<\u0017B\u00017d\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^\u0001\u0005]\u0006lW-I\u0001p\u0003!ZH-[:qY\u0006Lh*Y7f{:\nXo\u001c:v[vZ\b' \u0018sKBd\u0017nY1uS>tWh_\u0019~Q\u0011q\u0011o\u001e=\u0011\u0005I,X\"A:\u000b\u0005Q\u001c\u0017\u0001\u00039s_ZLG-\u001a:\n\u0005Y\u001c(\u0001D'fi\"|GmU8ve\u000e,\u0017!\u0002<bYV,G&A=\"\u0003i\fA&\u00117m'V\u0004\bo\u001c:uK\u0012\fVo\u001c:v[\u0006sGMU3qY&\u001c\u0017\r^5p]\u000e{WNY5oCRLwN\\:)\u000b9ax/!\u0002\u0011\u0007u\f\t!D\u0001\u007f\u0015\tyX-A\u0002ba&L1!a\u0001\u007f\u0005\u001d!\u0016.\\3pkRt\u0012aD\u0001:i\u0016\u001cHOR3uG\"4%o\\7MK\u0006$WM],iS2,\u0007K]3gKJ\u0014X\r\u001a*fC\u0012\u0014V\r\u001d7jG\u0006L5/\u00168bm\u0006LG.\u00192mKR\u0019q*a\u0003\t\u000bM{\u0001\u0019\u0001+)\u000b=\tW.a\u0004\"\u0005\u0005E\u0011AI>eSN\u0004H.Y=OC6,WPL>be\u001e,X.\u001a8ug^KG\u000f\u001b(b[\u0016\u001cX\u0010K\u0004\u0010\u0003+\tY\"!\b\u0011\u0007I\f9\"C\u0002\u0002\u001aM\u00141BV1mk\u0016\u001cv.\u001e:dK\u000691\u000f\u001e:j]\u001e\u001cH\u0006BA\u0010\u0003G\t#!!\t\u0002\u0005i\\\u0017EAA\u0013\u0003\u0015Y'/\u00194u\u0003u!Xm\u001d;GKR\u001c\u0007N\u0012:p[\u001a{G\u000e\\8xKJ<\u0016\u000e\u001e5S_2dGcA(\u0002,!)1\u000b\u0005a\u0001)\"*\u0001#Y7\u0002\u0010!:\u0001#!\u0006\u0002\u001c\u0005EB\u0006BA\u0010\u0003G\t!\u0004^3tiJ\u000b7m[!xCJ,'+\u00198hK\u0006\u001b8/[4o_J$2aTA\u001c\u0011\u0015\u0019\u0016\u00031\u0001UQ\r\t\u00121\b\t\u0004{\u0006u\u0012bAA }\nAA)[:bE2,G\rK\u0003\u0012C6\fy\u0001K\u0004\u0012\u0003+\tY\"!\u0012-\t\u0005}\u00111\u0005")
/* loaded from: input_file:kafka/server/FetchFromFollowerIntegrationTest.class */
public class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
    private final int numNodes = 2;
    private final int numParts = 1;
    private final String topic = "test-fetch-from-follower";
    private final int leaderBrokerId = 0;
    private final int followerBrokerId = 1;

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public String topic() {
        return this.topic;
    }

    public int leaderBrokerId() {
        return this.leaderBrokerId;
    }

    public int followerBrokerId() {
        return this.followerBrokerId;
    }

    public Properties overridingProps() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.NumPartitionsProp(), Integer.toString(numParts()));
        properties.put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), Integer.toString(numNodes()));
        properties.put(KafkaConfig$.MODULE$.UnstableMetadataVersionsEnableProp(), "true");
        properties.put(KafkaConfig$.MODULE$.UnstableApiVersionsEnableProp(), "true");
        maybeEnablePushReplication(properties);
        return properties;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs */
    public Seq<KafkaConfig> mo42generateConfigs() {
        int numNodes = numNodes();
        String zkConnectOrNull = zkConnectOrNull();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        Option<SecurityProtocol> option = None$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        Option<File> option2 = None$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        Option<Properties> option3 = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        Map<Object, String> map = (Map) Map$.MODULE$.apply(Nil$.MODULE$);
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        return (Seq) TestUtils$.MODULE$.createBrokerConfigs(numNodes, zkConnectOrNull, false, true, option, option2, option3, true, false, false, false, map, 1, false, 1, (short) 1, 0, true).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties, this.overridingProps(), true);
        });
    }

    @MethodSource({"AllSupportedQuorumAndReplicationCombinations"})
    @Timeout(15)
    @ParameterizedTest(name = "{displayName}.quorum={0}.replication={1}")
    public void testFollowerCompleteDelayedFetchesOnReplication(String str, String str2) {
        Admin createAdminClient = createAdminClient(createAdminClient$default$1(), createAdminClient$default$2());
        String str3 = topic();
        Buffer<KafkaBroker> brokers = brokers();
        Seq<ControllerServer> controllerServers = controllerServers();
        Map<Object, Seq<Object>> map = (scala.collection.immutable.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{leaderBrokerId(), followerBrokerId()})))}));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        scala.collection.immutable.Map<Object, Object> createTopicWithAdmin = TestUtils$.MODULE$.createTopicWithAdmin(createAdminClient, str3, brokers, controllerServers, 1, 1, map, new Properties());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        Buffer<KafkaBroker> brokers2 = brokers();
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        testUtils$4.waitUntilLeaderIsKnown(brokers2, topicPartition, 15000L);
        Assertions.assertTrue(createTopicWithAdmin.values().forall(i -> {
            return i == this.leaderBrokerId();
        }));
        short latestVersion = ApiKeys.FETCH.latestVersion();
        TopicPartition topicPartition2 = new TopicPartition(topic(), 0);
        scala.collection.immutable.Map<TopicPartition, Object> map2 = (scala.collection.immutable.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), BoxesRunTime.boxToLong(0L))}));
        maybeWaitUntilReplicasInPushMode(topicPartition2);
        FetchRequest createConsumerFetchRequest = createConsumerFetchRequest(1000, 1000, new $colon.colon(topicPartition2, Nil$.MODULE$), map2, latestVersion, 20000, 1, createConsumerFetchRequest$default$8());
        Socket connect = connect(brokerSocketServer(followerBrokerId()), connect$default$2());
        try {
            send(createConsumerFetchRequest, connect, send$default$3(), send$default$4());
            TestUtils$ testUtils$6 = TestUtils$.MODULE$;
            Buffer<KafkaBroker> brokers3 = brokers();
            String str4 = topic();
            TestUtils$ testUtils$7 = TestUtils$.MODULE$;
            testUtils$6.generateAndProduceMessages(brokers3, str4, 1, -1);
            FetchResponse receive = receive(connect, ApiKeys.FETCH, latestVersion, ClassTag$.MODULE$.apply(FetchResponse.class), NotNothing$.MODULE$.notNothingEvidence($less$colon$less$.MODULE$.refl()));
            Assertions.assertEquals(Errors.NONE, receive.error());
            Assertions.assertEquals(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Errors.NONE), BoxesRunTime.boxToInteger(2))}))).asJava(), receive.errorCounts());
        } finally {
            connect.close();
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(String str) {
        Admin createAdminClient = createAdminClient(createAdminClient$default$1(), createAdminClient$default$2());
        String str2 = topic();
        Buffer<KafkaBroker> brokers = brokers();
        Seq<ControllerServer> controllerServers = controllerServers();
        Map<Object, Seq<Object>> map = (scala.collection.immutable.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{leaderBrokerId(), followerBrokerId()})))}));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        TestUtils$.MODULE$.createTopicWithAdmin(createAdminClient, str2, brokers, controllerServers, 1, 1, map, new Properties());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        Buffer<KafkaBroker> brokers2 = brokers();
        String str3 = topic();
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        testUtils$4.generateAndProduceMessages(brokers2, str3, 10, -1);
        Assertions.assertEquals(1, getPreferredReplica(topic(), leaderBrokerId(), followerBrokerId()));
        ((KafkaBroker) brokers().apply(followerBrokerId())).shutdown();
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$1(this, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("follower is still reachable.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(-1, getPreferredReplica(topic(), leaderBrokerId(), followerBrokerId()));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testFetchFromFollowerWithRoll(String str) {
        Admin createAdminClient = createAdminClient(createAdminClient$default$1(), createAdminClient$default$2());
        String str2 = topic();
        Buffer<KafkaBroker> brokers = brokers();
        Seq<ControllerServer> controllerServers = controllerServers();
        Map<Object, Seq<Object>> map = (scala.collection.immutable.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{leaderBrokerId(), followerBrokerId()})))}));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        TestUtils$.MODULE$.createTopicWithAdmin(createAdminClient, str2, brokers, controllerServers, 1, 1, map, new Properties());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers(bootstrapServers$default$1()));
        properties.put("group.id", "test-group");
        properties.put("auto.offset.reset", "earliest");
        properties.put("client.rack", Integer.toString(followerBrokerId()));
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        try {
            kafkaConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topic(), Nil$.MODULE$)).asJava());
            TestUtils$ testUtils$4 = TestUtils$.MODULE$;
            TestUtils$ testUtils$5 = TestUtils$.MODULE$;
            TestUtils$ testUtils$6 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testFetchFromFollowerWithRoll$1(this)) {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Assertions.fail("Preferred replica is not set");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            TestUtils$ testUtils$7 = TestUtils$.MODULE$;
            Buffer<KafkaBroker> brokers2 = brokers();
            String str3 = topic();
            TestUtils$ testUtils$8 = TestUtils$.MODULE$;
            testUtils$7.generateAndProduceMessages(brokers2, str3, 1, -1);
            TestUtils$ testUtils$9 = TestUtils$.MODULE$;
            TestUtils$ testUtils$10 = TestUtils$.MODULE$;
            testUtils$9.pollUntilAtLeastNumRecords(kafkaConsumer, 1, 15000L);
            ((KafkaBroker) brokers().apply(followerBrokerId())).shutdown();
            TestUtils$ testUtils$11 = TestUtils$.MODULE$;
            Buffer<KafkaBroker> brokers3 = brokers();
            String str4 = topic();
            TestUtils$ testUtils$12 = TestUtils$.MODULE$;
            testUtils$11.generateAndProduceMessages(brokers3, str4, 1, -1);
            TestUtils$ testUtils$13 = TestUtils$.MODULE$;
            TestUtils$ testUtils$14 = TestUtils$.MODULE$;
            testUtils$13.pollUntilAtLeastNumRecords(kafkaConsumer, 1, 15000L);
            ((KafkaBroker) brokers().apply(followerBrokerId())).startup();
            TestUtils$ testUtils$15 = TestUtils$.MODULE$;
            TestUtils$ testUtils$16 = TestUtils$.MODULE$;
            TestUtils$ testUtils$17 = TestUtils$.MODULE$;
            long currentTimeMillis2 = System.currentTimeMillis();
            while (!$anonfun$testFetchFromFollowerWithRoll$3(this)) {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    Assertions.fail("Preferred replica is not set");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
            TestUtils$ testUtils$18 = TestUtils$.MODULE$;
            Buffer<KafkaBroker> brokers4 = brokers();
            String str5 = topic();
            TestUtils$ testUtils$19 = TestUtils$.MODULE$;
            testUtils$18.generateAndProduceMessages(brokers4, str5, 1, -1);
            TestUtils$ testUtils$20 = TestUtils$.MODULE$;
            TestUtils$ testUtils$21 = TestUtils$.MODULE$;
            testUtils$20.pollUntilAtLeastNumRecords(kafkaConsumer, 1, 15000L);
        } finally {
            kafkaConsumer.close();
        }
    }

    @Disabled
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testRackAwareRangeAssignor(String str) {
        List list = brokers().indices().toList();
        String str2 = "topicWithAllPartitionsOnAllRacks";
        createTopic("topicWithAllPartitionsOnAllRacks", brokers().size(), brokers().size(), createTopic$default$4(), createTopic$default$5(), createTopic$default$6());
        String str3 = "topicWithSingleRackPartitions";
        createTopicWithAssignment("topicWithSingleRackPartitions", list.map(obj -> {
            return $anonfun$testRackAwareRangeAssignor$1(this, BoxesRunTime.unboxToInt(obj));
        }).toMap($less$colon$less$.MODULE$.refl()), createTopicWithAssignment$default$3());
        consumerConfig().setProperty("partition.assignment.strategy", RangeAssignor.class.getName());
        Buffer buffer = (Buffer) brokers().map(kafkaBroker -> {
            this.consumerConfig().setProperty("auto.offset.reset", "earliest");
            this.consumerConfig().setProperty("client.rack", (String) kafkaBroker.config().rack().orNull($less$colon$less$.MODULE$.refl()));
            this.consumerConfig().setProperty("group.instance.id", new StringBuilder(9).append("instance-").append(kafkaBroker.config().brokerId()).toString());
            this.consumerConfig().setProperty("metadata.max.age.ms", "1000");
            return this.createConsumer(this.createConsumer$default$1(), this.createConsumer$default$2(), this.createConsumer$default$3(), this.createConsumer$default$4());
        });
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(buffer.size());
        try {
            buffer.foreach(consumer -> {
                $anonfun$testRackAwareRangeAssignor$15(str3, consumer);
                return BoxedUnit.UNIT;
            });
            verifyAssignments$1(list.reverse(), ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"topicWithSingleRackPartitions"}), buffer, newFixedThreadPool, createProducer);
            buffer.foreach(consumer2 -> {
                $anonfun$testRackAwareRangeAssignor$16(str2, consumer2);
                return BoxedUnit.UNIT;
            });
            verifyAssignments$1(list, ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"topicWithAllPartitionsOnAllRacks"}), buffer, newFixedThreadPool, createProducer);
            buffer.foreach(consumer3 -> {
                $anonfun$testRackAwareRangeAssignor$17(str3, str2, consumer3);
                return BoxedUnit.UNIT;
            });
            verifyAssignments$1(list.reverse(), ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"topicWithAllPartitionsOnAllRacks", "topicWithSingleRackPartitions"}), buffer, newFixedThreadPool, createProducer);
            Admin createAdminClient = createAdminClient(createAdminClient$default$1(), createAdminClient$default$2());
            HashMap hashMap = new HashMap();
            list.foreach(obj2 -> {
                return $anonfun$testRackAwareRangeAssignor$18(hashMap, str3, BoxesRunTime.unboxToInt(obj2));
            });
            createAdminClient.alterPartitionReassignments(hashMap).all().get(30L, TimeUnit.SECONDS);
            verifyAssignments$1(list, ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"topicWithAllPartitionsOnAllRacks", "topicWithSingleRackPartitions"}), buffer, newFixedThreadPool, createProducer);
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$1(FetchFromFollowerIntegrationTest fetchFromFollowerIntegrationTest, TopicPartition topicPartition) {
        return !((KafkaBroker) fetchFromFollowerIntegrationTest.brokers().apply(fetchFromFollowerIntegrationTest.leaderBrokerId())).metadataCache().getPartitionReplicaEndpoints(topicPartition, fetchFromFollowerIntegrationTest.listenerName()).contains(BoxesRunTime.boxToInteger(fetchFromFollowerIntegrationTest.followerBrokerId()));
    }

    public static final /* synthetic */ String $anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$2() {
        return "follower is still reachable.";
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromFollowerWithRoll$1(FetchFromFollowerIntegrationTest fetchFromFollowerIntegrationTest) {
        return fetchFromFollowerIntegrationTest.getPreferredReplica(fetchFromFollowerIntegrationTest.topic(), fetchFromFollowerIntegrationTest.leaderBrokerId(), fetchFromFollowerIntegrationTest.followerBrokerId()) == 1;
    }

    public static final /* synthetic */ String $anonfun$testFetchFromFollowerWithRoll$2() {
        return "Preferred replica is not set";
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromFollowerWithRoll$3(FetchFromFollowerIntegrationTest fetchFromFollowerIntegrationTest) {
        return fetchFromFollowerIntegrationTest.getPreferredReplica(fetchFromFollowerIntegrationTest.topic(), fetchFromFollowerIntegrationTest.leaderBrokerId(), fetchFromFollowerIntegrationTest.followerBrokerId()) == 1;
    }

    public static final /* synthetic */ String $anonfun$testFetchFromFollowerWithRoll$4() {
        return "Preferred replica is not set";
    }

    public static final /* synthetic */ Tuple2 $anonfun$testRackAwareRangeAssignor$1(FetchFromFollowerIntegrationTest fetchFromFollowerIntegrationTest, int i) {
        return new Tuple2(BoxesRunTime.boxToInteger(i), package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{(fetchFromFollowerIntegrationTest.brokers().size() - i) - 1})));
    }

    public static final /* synthetic */ Set $anonfun$testRackAwareRangeAssignor$3(scala.collection.immutable.Seq seq, int i) {
        return ((IterableOnceOps) seq.map(str -> {
            return new TopicPartition(str, i);
        })).toSet();
    }

    public static final /* synthetic */ boolean $anonfun$testRackAwareRangeAssignor$7(Consumer consumer, Set set) {
        java.util.Set assignment = consumer.assignment();
        java.util.Set asJava = CollectionConverters$.MODULE$.SetHasAsJava(set).asJava();
        return assignment == null ? asJava == null : assignment.equals(asJava);
    }

    public static final /* synthetic */ String $anonfun$testRackAwareRangeAssignor$8(Set set, Consumer consumer) {
        return new StringBuilder(73).append("Timed out while awaiting expected assignment ").append(set).append(". The current assignment is ").append(consumer.assignment()).toString();
    }

    public static final /* synthetic */ void $anonfun$testRackAwareRangeAssignor$9(Future future) {
        Assertions.assertEquals(0, BoxesRunTime.unboxToInt(future.get(30L, TimeUnit.SECONDS)));
    }

    public static final /* synthetic */ void $anonfun$testRackAwareRangeAssignor$13(List list, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Future future = (Future) tuple2._1();
        Assertions.assertEquals(list.apply(tuple2._2$mcI$sp()), ((IterableOnceOps) ((Seq) future.get(30L, TimeUnit.SECONDS)).map(consumerRecord -> {
            return new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        })).toSet());
    }

    private static final void verifyAssignments$1(List list, scala.collection.immutable.Seq seq, Buffer buffer, ExecutorService executorService, KafkaProducer kafkaProducer) {
        List map = list.map(obj -> {
            return $anonfun$testRackAwareRangeAssignor$3(seq, BoxesRunTime.unboxToInt(obj));
        });
        ((Buffer) ((IterableOps) buffer.zipWithIndex()).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            Consumer consumer = (Consumer) tuple2._1();
            int _2$mcI$sp = tuple2._2$mcI$sp();
            return executorService.submit(() -> {
                Set set = (Set) map.apply(_2$mcI$sp);
                TestUtils$ testUtils$ = TestUtils$.MODULE$;
                long currentTimeMillis = System.currentTimeMillis();
                while (true) {
                    consumer.poll(Duration.ofMillis(100L));
                    if ($anonfun$testRackAwareRangeAssignor$7(consumer, set)) {
                        return;
                    }
                    if (System.currentTimeMillis() > currentTimeMillis + 30000) {
                        Assertions.fail($anonfun$testRackAwareRangeAssignor$8(set, consumer));
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(30000L), 0L));
                }
            }, BoxesRunTime.boxToInteger(0));
        })).foreach(future -> {
            $anonfun$testRackAwareRangeAssignor$9(future);
            return BoxedUnit.UNIT;
        });
        ((List) map.flatten(Predef$.MODULE$.$conforms())).foreach(topicPartition -> {
            return kafkaProducer.send(new ProducerRecord(topicPartition.topic(), Predef$.MODULE$.int2Integer(topicPartition.partition()), new StringBuilder(4).append("key-").append(topicPartition).toString().getBytes(), new StringBuilder(6).append("value-").append(topicPartition).toString().getBytes()));
        });
        ((IterableOnceOps) ((Buffer) ((IterableOps) buffer.zipWithIndex()).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            Consumer consumer = (Consumer) tuple22._1();
            int _2$mcI$sp = tuple22._2$mcI$sp();
            return executorService.submit(() -> {
                return TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, ((IterableOnceOps) map.apply(_2$mcI$sp)).size(), 30000L);
            });
        })).zipWithIndex()).foreach(tuple23 -> {
            $anonfun$testRackAwareRangeAssignor$13(map, tuple23);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$testRackAwareRangeAssignor$15(String str, Consumer consumer) {
        consumer.subscribe(Collections.singleton(str));
    }

    public static final /* synthetic */ void $anonfun$testRackAwareRangeAssignor$16(String str, Consumer consumer) {
        consumer.subscribe(Collections.singleton(str));
    }

    public static final /* synthetic */ void $anonfun$testRackAwareRangeAssignor$17(String str, String str2, Consumer consumer) {
        consumer.subscribe(CollectionConverters$.MODULE$.SetHasAsJava((scala.collection.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{str, str2}))).asJava());
    }

    public static final /* synthetic */ Optional $anonfun$testRackAwareRangeAssignor$18(HashMap hashMap, String str, int i) {
        return (Optional) hashMap.put(new TopicPartition(str, i), Optional.of(new NewPartitionReassignment(Collections.singletonList(Predef$.MODULE$.int2Integer(i)))));
    }
}
