/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.util.Collection;
import java.util.List;
import java.util.Properties;
import kafka.api.LeaderAndIsr$;
import kafka.cluster.Broker;
import kafka.controller.ControllerChannelManager;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.StateChangeLogger;
import kafka.server.HostedPartition;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractControlRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable$;
import scala.collection.mutable.Buffer;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.VolatileBooleanRef;

@ScalaSignature(bytes="\u0006\u0001\u0005%c\u0001B\u0001\u0003\u0001\u001d\u0011!D\u0011:pW\u0016\u0014X\t]8dQ&sG/Z4sCRLwN\u001c+fgRT!a\u0001\u0003\u0002\rM,'O^3s\u0015\u0005)\u0011!B6bM.\f7\u0001A\n\u0003\u0001!\u0001\"!\u0003\u0007\u000e\u0003)Q!a\u0003\u0003\u0002\u0005i\\\u0017BA\u0007\u000b\u0005QQvn\\&fKB,'\u000fV3ti\"\u000b'O\\3tg\")q\u0002\u0001C\u0001!\u00051A(\u001b8jiz\"\u0012!\u0005\t\u0003%\u0001i\u0011A\u0001\u0005\b)\u0001\u0011\r\u0011\"\u0001\u0016\u0003%\u0011'o\\6fe&#\u0017'F\u0001\u0017!\t9\"$D\u0001\u0019\u0015\u0005I\u0012!B:dC2\f\u0017BA\u000e\u0019\u0005\rIe\u000e\u001e\u0005\u0007;\u0001\u0001\u000b\u0011\u0002\f\u0002\u0015\t\u0014xn[3s\u0013\u0012\f\u0004\u0005C\u0004 \u0001\t\u0007I\u0011A\u000b\u0002\u0013\t\u0014xn[3s\u0013\u0012\u0014\u0004BB\u0011\u0001A\u0003%a#\u0001\u0006ce>\\WM]%ee\u0001Bqa\t\u0001A\u0002\u0013\u0005A%A\u0004tKJ4XM]:\u0016\u0003\u0015\u00022A\n\u00182\u001d\t9CF\u0004\u0002)W5\t\u0011F\u0003\u0002+\r\u00051AH]8pizJ\u0011!G\u0005\u0003[a\tq\u0001]1dW\u0006<W-\u0003\u00020a\t\u00191+Z9\u000b\u00055B\u0002C\u0001\n3\u0013\t\u0019$AA\u0006LC\u001a\\\u0017mU3sm\u0016\u0014\bbB\u001b\u0001\u0001\u0004%\tAN\u0001\fg\u0016\u0014h/\u001a:t?\u0012*\u0017\u000f\u0006\u00028uA\u0011q\u0003O\u0005\u0003sa\u0011A!\u00168ji\"91\bNA\u0001\u0002\u0004)\u0013a\u0001=%c!1Q\b\u0001Q!\n\u0015\n\u0001b]3sm\u0016\u00148\u000f\t\u0005\u0006\u007f\u0001!\t\u0005Q\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002o!\u0012aH\u0011\t\u0003\u0007\"k\u0011\u0001\u0012\u0006\u0003\u000b\u001a\u000bQA[;oSRT\u0011aR\u0001\u0004_J<\u0017BA%E\u0005\u0019\u0011UMZ8sK\")1\n\u0001C!\u0001\u0006AA/Z1s\t><h\u000e\u000b\u0002K\u001bB\u00111IT\u0005\u0003\u001f\u0012\u0013Q!\u00114uKJDQ!\u0015\u0001\u0005\u0002\u0001\u000b!\u0006^3tiJ+\u0007\u000f\\5dC6\u000bg.Y4fe\n\u0013xn[3s\u000bB|7\r['bi\u000eDWm],ji\"T6\u000e\u000b\u0002Q'B\u00111\tV\u0005\u0003+\u0012\u0013A\u0001V3ti\")q\u000b\u0001C\u0001\u0001\u0006YC/Z:u\u0007>tGO]8mY\u0016\u0014(I]8lKJ,\u0005o\\2i\u0007\u0006\u001c\u0007.Z'bi\u000eDWm],ji\"T6\u000e\u000b\u0002W'\")!\f\u0001C\u0001\u0001\u0006AC/Z:u\u0007>tGO]8m%\u0016\fX/Z:u/&$\bnQ8se\u0016\u001cGO\u0011:pW\u0016\u0014X\t]8dQ\"\u0012\u0011l\u0015\u0005\u0006;\u0002!\t\u0001Q\u0001'i\u0016\u001cHoQ8oiJ|GNU3rk\u0016\u001cHoV5uQN#\u0018\r\\3Ce>\\WM]#q_\u000eD\u0007F\u0001/T\u0011\u0015\u0001\u0007\u0001\"\u0003b\u0003\u0005\"Xm\u001d;D_:$(o\u001c7SKF,Xm\u001d;XSRD'I]8lKJ,\u0005o\\2i)\t9$\rC\u0003d?\u0002\u0007A-A\u000bjg\u0016\u0003xn\u00195J]J+\u0017/^3tiN#\u0018\r\\3\u0011\u0005])\u0017B\u00014\u0019\u0005\u001d\u0011un\u001c7fC:DQ\u0001\u001b\u0001\u0005\n%\fQbZ3u\u0007>tGO]8mY\u0016\u0014X#A\u0019\t\u000b-\u0004A\u0011\u00027\u0002[\rDWmY6D_:$(o\u001c7mKJ\u0014%o\\6fe\u0016\u0003xn\u00195t\u0007\u0006\u001c\u0007.Z'bi\u000eDWm],ji\"T6\u000e\u0006\u00028[\")aN\u001ba\u0001_\u0006\t2m\u001c8ue>dG.\u001a:D_:$X\r\u001f;\u0011\u0005A\u001cX\"A9\u000b\u0005I$\u0011AC2p]R\u0014x\u000e\u001c7fe&\u0011A/\u001d\u0002\u0012\u0007>tGO]8mY\u0016\u00148i\u001c8uKb$\b\"\u0002<\u0001\t\u00139\u0018aJ:f]\u0012\fe\u000e\u001a,fe&4\u0017p\u0015;bY\u0016\u0014%o\\6fe\u0016\u0003xn\u00195J]J+7\u000f]8og\u0016$2a\u000e=~\u0011\u0015IX\u000f1\u0001{\u0003a\u0019wN\u001c;s_2dWM]\"iC:tW\r\\'b]\u0006<WM\u001d\t\u0003anL!\u0001`9\u00031\r{g\u000e\u001e:pY2,'o\u00115b]:,G.T1oC\u001e,'\u000fC\u0003\u007fk\u0002\u0007q0A\u0004ck&dG-\u001a:1\t\u0005\u0005\u00111\u0005\t\u0007\u0003\u0007\tI\"a\b\u000f\t\u0005\u0015\u0011QC\u0007\u0003\u0003\u000fQA!!\u0003\u0002\f\u0005A!/Z9vKN$8O\u0003\u0003\u0002\u000e\u0005=\u0011AB2p[6|gNC\u0002\u0006\u0003#Q1!a\u0005G\u0003\u0019\t\u0007/Y2iK&!\u0011qCA\u0004\u0003Y\t%m\u001d;sC\u000e$8i\u001c8ue>d'+Z9vKN$\u0018\u0002BA\u000e\u0003;\u0011qAQ;jY\u0012,'O\u0003\u0003\u0002\u0018\u0005\u001d\u0001\u0003BA\u0011\u0003Ga\u0001\u0001B\u0006\u0002&u\f\t\u0011!A\u0003\u0002\u0005\u001d\"aA0%cE!\u0011\u0011FA\u0018!\r9\u00121F\u0005\u0004\u0003[A\"a\u0002(pi\"Lgn\u001a\t\u0005\u0003\u000b\t\t$\u0003\u0003\u00024\u0005\u001d!AF!cgR\u0014\u0018m\u0019;D_:$(o\u001c7SKF,Xm\u001d;\t\u000f\u0005]\u0002\u0001\"\u0003\u0002:\u0005y2/\u001a8e\u0003:$g+\u001a:jMf\u001cVoY2fgN4W\u000f\u001c*fgB|gn]3\u0015\u000b]\nY$!\u0010\t\re\f)\u00041\u0001{\u0011\u001dq\u0018Q\u0007a\u0001\u0003\u007f\u0001D!!\u0011\u0002FA1\u00111AA\r\u0003\u0007\u0002B!!\t\u0002F\u0011a\u0011qIA\u001f\u0003\u0003\u0005\tQ!\u0001\u0002(\t\u0019q\f\n\u001a")
public class BrokerEpochIntegrationTest
extends ZooKeeperTestHarness {
    private final int brokerId1;
    private final int brokerId2;
    private Seq<KafkaServer> servers = (Seq)Seq$.MODULE$.empty();

    public int brokerId1() {
        return this.brokerId1;
    }

    public int brokerId2() {
        return this.brokerId2;
    }

    public Seq<KafkaServer> servers() {
        return this.servers;
    }

    public void servers_$eq(Seq<KafkaServer> x$1) {
        this.servers = x$1;
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        Seq configs = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Properties[]{TestUtils$.MODULE$.createBrokerConfig(this.brokerId1(), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()), TestUtils$.MODULE$.createBrokerConfig(this.brokerId2(), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())}));
        configs.foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Object apply(Properties config) {
                return config.setProperty(KafkaConfig$.MODULE$.AutoLeaderRebalanceEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)false)).toString());
            }
        });
        this.servers_$eq((Seq<KafkaServer>)((Seq)configs.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaServer apply(Properties config) {
                return TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(config), TestUtils$.MODULE$.createServer$default$2());
            }
        }, Seq$.MODULE$.canBuildFrom())));
    }

    @Override
    @After
    public void tearDown() {
        TestUtils$.MODULE$.shutdownServers(this.servers());
        super.tearDown();
    }

    @Test
    public void testReplicaManagerBrokerEpochMatchesWithZk() {
        Map brokerAndEpochs = this.zkClient().getAllBrokerAndEpochsInCluster();
        Assert.assertEquals((long)brokerAndEpochs.size(), (long)this.servers().size());
        brokerAndEpochs.foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ BrokerEpochIntegrationTest $outer;

            public final void apply(Tuple2<Broker, Object> x0$1) {
                Tuple2<Broker, Object> tuple2 = x0$1;
                if (tuple2 != null) {
                    Broker broker = (Broker)tuple2._1();
                    long epoch = tuple2._2$mcJ$sp();
                    Option brokerServer = this.$outer.servers().find((Function1)new Serializable(this, broker){
                        public static final long serialVersionUID = 0L;
                        private final Broker broker$1;

                        public final boolean apply(KafkaServer e) {
                            return e.config().brokerId() == this.broker$1.id();
                        }
                        {
                            this.broker$1 = broker$1;
                        }
                    });
                    Assert.assertTrue((boolean)brokerServer.isDefined());
                    Assert.assertEquals((long)epoch, (long)((KafkaServer)brokerServer.get()).kafkaController().brokerEpoch());
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                throw new MatchError(tuple2);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    @Test
    public void testControllerBrokerEpochCacheMatchesWithZk() {
        KafkaServer controller = this.getController();
        KafkaServer otherBroker = (KafkaServer)this.servers().find((Function1)new Serializable(this, controller){
            public static final long serialVersionUID = 0L;
            private final KafkaServer controller$1;

            public final boolean apply(KafkaServer e) {
                return e.config().brokerId() != this.controller$1.config().brokerId();
            }
            {
                this.controller$1 = controller$1;
            }
        }).get();
        this.checkControllerBrokerEpochsCacheMatchesWithZk(controller.kafkaController().controllerContext());
        otherBroker.shutdown();
        this.checkControllerBrokerEpochsCacheMatchesWithZk(controller.kafkaController().controllerContext());
        otherBroker.startup();
        this.checkControllerBrokerEpochsCacheMatchesWithZk(controller.kafkaController().controllerContext());
    }

    @Test
    public void testControlRequestWithCorrectBrokerEpoch() {
        this.testControlRequestWithBrokerEpoch(false);
    }

    @Test
    public void testControlRequestWithStaleBrokerEpoch() {
        this.testControlRequestWithBrokerEpoch(true);
    }

    /*
     * WARNING - void declaration
     */
    private void testControlRequestWithBrokerEpoch(boolean isEpochInRequestStale) {
        ControllerChannelManager controllerChannelManager;
        Metrics metrics;
        block7: {
            TopicPartition tp = new TopicPartition("new-topic", 0);
            TestUtils$.MODULE$.createTopic(this.zkClient(), tp.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{this.brokerId1(), this.brokerId2()})))}))), this.servers());
            int controllerId = 2;
            int controllerEpoch = ((Tuple2)this.zkClient().getControllerEpoch().get())._1$mcI$sp();
            KafkaConfig controllerConfig = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(controllerId, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
            SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
            ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
            scala.collection.immutable.Map brokerAndEpochs = ((TraversableOnce)this.servers().map((Function1)new Serializable(this, securityProtocol, listenerName){
                public static final long serialVersionUID = 0L;
                private final SecurityProtocol securityProtocol$1;
                private final ListenerName listenerName$1;

                public final Tuple2<Broker, Object> apply(KafkaServer s) {
                    return new Tuple2((Object)new Broker(s.config().brokerId(), "localhost", TestUtils$.MODULE$.boundPort(s, TestUtils$.MODULE$.boundPort$default$2()), this.listenerName$1, this.securityProtocol$1), (Object)BoxesRunTime.boxToLong((long)s.kafkaController().brokerEpoch()));
                }
                {
                    this.securityProtocol$1 = securityProtocol$1;
                    this.listenerName$1 = listenerName$1;
                }
            }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
            Iterable nodes = (Iterable)brokerAndEpochs.keys().map((Function1)new Serializable(this, listenerName){
                public static final long serialVersionUID = 0L;
                private final ListenerName listenerName$1;

                public final Node apply(Broker x$1) {
                    return x$1.node(this.listenerName$1);
                }
                {
                    this.listenerName$1 = listenerName$1;
                }
            }, scala.collection.Iterable$.MODULE$.canBuildFrom());
            ControllerContext controllerContext = new ControllerContext();
            controllerContext.setLiveBrokerAndEpochs((Map)brokerAndEpochs);
            metrics = new Metrics();
            controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, metrics, new StateChangeLogger(controllerId, true, (Option)None$.MODULE$), ControllerChannelManager$.MODULE$.$lessinit$greater$default$6());
            controllerChannelManager.startup();
            KafkaServer broker2 = (KafkaServer)this.servers().apply(this.brokerId2());
            long epochInRequest = isEpochInRequestStale ? broker2.kafkaController().brokerEpoch() - 1L : broker2.kafkaController().brokerEpoch();
            try {
                Object object;
                Seq partitionStates = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new LeaderAndIsrRequestData.LeaderAndIsrPartitionState[]{new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setTopicName(tp.topic()).setPartitionIndex(tp.partition()).setControllerEpoch(controllerEpoch).setLeader(this.brokerId2()).setLeaderEpoch(LeaderAndIsr$.MODULE$.initialLeaderEpoch() + 1).setIsr((List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)((TraversableLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{this.brokerId1(), this.brokerId2()}))).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Integer apply(int x$1) {
                        return x$1;
                    }
                }, Seq$.MODULE$.canBuildFrom())).asJava()).setZkVersion(LeaderAndIsr$.MODULE$.initialZKVersion()).setReplicas((List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)((TraversableLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0, 1}))).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Integer apply(int x$1) {
                        return x$1;
                    }
                }, Seq$.MODULE$.canBuildFrom())).asJava()).setIsNew(false)}));
                LeaderAndIsrRequest.Builder requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(), controllerId, controllerEpoch, epochInRequest, (List)JavaConverters$.MODULE$.seqAsJavaListConverter(partitionStates).asJava(), (Collection)JavaConverters$.MODULE$.setAsJavaSetConverter((Set)nodes.toSet()).asJava());
                if (isEpochInRequestStale) {
                    this.sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder);
                    object = BoxedUnit.UNIT;
                } else {
                    this.sendAndVerifySuccessfulResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder);
                    object = BoxesRunTime.boxToInteger((int)TestUtils$.MODULE$.waitUntilLeaderIsKnown((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{broker2}))), tp, 10000L));
                }
                Seq partitionStates2 = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new UpdateMetadataRequestData.UpdateMetadataPartitionState[]{new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName(tp.topic()).setPartitionIndex(tp.partition()).setControllerEpoch(controllerEpoch).setLeader(this.brokerId2()).setLeaderEpoch(LeaderAndIsr$.MODULE$.initialLeaderEpoch() + 1).setIsr((List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)((TraversableLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{this.brokerId1(), this.brokerId2()}))).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Integer apply(int x$1) {
                        return x$1;
                    }
                }, Seq$.MODULE$.canBuildFrom())).asJava()).setZkVersion(LeaderAndIsr$.MODULE$.initialZKVersion()).setReplicas((List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)((TraversableLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0, 1}))).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Integer apply(int x$1) {
                        return x$1;
                    }
                }, Seq$.MODULE$.canBuildFrom())).asJava())}));
                Buffer liveBrokers = ((TraversableOnce)brokerAndEpochs.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final UpdateMetadataRequestData.UpdateMetadataBroker apply(Tuple2<Broker, Object> x0$2) {
                        Tuple2<Broker, Object> tuple2 = x0$2;
                        if (tuple2 != null) {
                            Broker broker = (Broker)tuple2._1();
                            SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
                            ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
                            Node node = broker.node(listenerName);
                            Seq endpoints = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new UpdateMetadataRequestData.UpdateMetadataEndpoint[]{new UpdateMetadataRequestData.UpdateMetadataEndpoint().setHost(node.host()).setPort(node.port()).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value())}));
                            UpdateMetadataRequestData.UpdateMetadataBroker updateMetadataBroker = new UpdateMetadataRequestData.UpdateMetadataBroker().setId(broker.id()).setEndpoints((List)JavaConverters$.MODULE$.seqAsJavaListConverter(endpoints).asJava()).setRack((String)broker.rack().orNull(Predef$.MODULE$.$conforms()));
                            return updateMetadataBroker;
                        }
                        throw new MatchError(tuple2);
                    }
                }, Iterable$.MODULE$.canBuildFrom())).toBuffer();
                UpdateMetadataRequest.Builder requestBuilder2 = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), controllerId, controllerEpoch, epochInRequest, (List)JavaConverters$.MODULE$.seqAsJavaListConverter(partitionStates2).asJava(), (List)JavaConverters$.MODULE$.bufferAsJavaListConverter(liveBrokers).asJava());
                if (isEpochInRequestStale) {
                    this.sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder2);
                } else {
                    this.sendAndVerifySuccessfulResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder2);
                    TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{broker2}))), tp.topic(), tp.partition(), 10000L);
                    Assert.assertEquals((long)this.brokerId2(), (long)((UpdateMetadataRequestData.UpdateMetadataPartitionState)broker2.metadataCache().getPartitionInfo(tp.topic(), tp.partition()).get()).leader());
                }
                StopReplicaRequest.Builder requestBuilder3 = new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion(), controllerId, controllerEpoch, epochInRequest, true, (Collection)JavaConverters$.MODULE$.setAsJavaSetConverter((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{tp}))).asJava());
                if (isEpochInRequestStale) {
                    this.sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder3);
                    break block7;
                }
                this.sendAndVerifySuccessfulResponse(controllerChannelManager, (AbstractControlRequest.Builder<? extends AbstractControlRequest>)requestBuilder3);
                Assert.assertEquals((Object)HostedPartition.None$.MODULE$, (Object)broker2.replicaManager().getPartition(tp));
            }
            catch (Throwable throwable) {
                void var11_11;
                void var12_12;
                var12_12.shutdown();
                var11_11.close();
                throw throwable;
            }
        }
        controllerChannelManager.shutdown();
        metrics.close();
    }

    private KafkaServer getController() {
        int controllerId = TestUtils$.MODULE$.waitUntilControllerElected(this.zkClient(), TestUtils$.MODULE$.waitUntilControllerElected$default$2());
        return (KafkaServer)((IterableLike)this.servers().filter((Function1)new Serializable(this, controllerId){
            public static final long serialVersionUID = 0L;
            private final int controllerId$1;

            public final boolean apply(KafkaServer s) {
                return s.config().brokerId() == this.controllerId$1;
            }
            {
                this.controllerId$1 = controllerId$1;
            }
        })).head();
    }

    private void checkControllerBrokerEpochsCacheMatchesWithZk(ControllerContext controllerContext) {
        Map brokerAndEpochs = this.zkClient().getAllBrokerAndEpochsInCluster();
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, controllerContext, brokerAndEpochs){
            public static final long serialVersionUID = 0L;
            private final ControllerContext controllerContext$1;
            private final Map brokerAndEpochs$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                Map brokerEpochsInControllerContext = this.controllerContext$1.liveBrokerIdAndEpochs();
                return this.brokerAndEpochs$1.size() != brokerEpochsInControllerContext.size() ? false : this.brokerAndEpochs$1.forall((Function1)new Serializable(this, brokerEpochsInControllerContext){
                    public static final long serialVersionUID = 0L;
                    private final Map brokerEpochsInControllerContext$1;

                    public final boolean apply(Tuple2<Broker, Object> x0$3) {
                        Tuple2<Broker, Object> tuple2 = x0$3;
                        if (tuple2 != null) {
                            Broker broker = (Broker)tuple2._1();
                            long epoch = tuple2._2$mcJ$sp();
                            boolean bl = this.brokerEpochsInControllerContext$1.get((Object)BoxesRunTime.boxToInteger((int)broker.id())).contains((Object)BoxesRunTime.boxToLong((long)epoch));
                            return bl;
                        }
                        throw new MatchError(tuple2);
                    }
                    {
                        this.brokerEpochsInControllerContext$1 = brokerEpochsInControllerContext$1;
                    }
                });
            }
            {
                this.controllerContext$1 = controllerContext$1;
                this.brokerAndEpochs$1 = brokerAndEpochs$1;
            }
        }, (Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Broker epoch mismatches";
            }
        }, TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
    }

    private void sendAndVerifyStaleBrokerEpochInResponse(ControllerChannelManager controllerChannelManager, AbstractControlRequest.Builder<? extends AbstractControlRequest> builder) {
        BooleanRef staleBrokerEpochDetected = BooleanRef.create((boolean)false);
        controllerChannelManager.sendRequest(this.brokerId2(), builder, (Function1)new Serializable(this, staleBrokerEpochDetected){
            public static final long serialVersionUID = 0L;
            private final BooleanRef staleBrokerEpochDetected$1;

            public final void apply(AbstractResponse response) {
                this.staleBrokerEpochDetected$1.elem = response.errorCounts().containsKey(Errors.STALE_BROKER_EPOCH);
            }
            {
                this.staleBrokerEpochDetected$1 = staleBrokerEpochDetected$1;
            }
        });
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, staleBrokerEpochDetected){
            public static final long serialVersionUID = 0L;
            private final BooleanRef staleBrokerEpochDetected$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return this.staleBrokerEpochDetected$1.elem;
            }
            {
                this.staleBrokerEpochDetected$1 = staleBrokerEpochDetected$1;
            }
        }, (Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Broker epoch should be stale";
            }
        }, TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
        Assert.assertTrue((String)"Stale broker epoch not detected by the broker", (boolean)staleBrokerEpochDetected.elem);
    }

    private void sendAndVerifySuccessfulResponse(ControllerChannelManager controllerChannelManager, AbstractControlRequest.Builder<? extends AbstractControlRequest> builder) {
        VolatileBooleanRef succeed = VolatileBooleanRef.create((boolean)false);
        controllerChannelManager.sendRequest(this.brokerId2(), builder, (Function1)new Serializable(this, succeed){
            public static final long serialVersionUID = 0L;
            private final VolatileBooleanRef succeed$1;

            public final void apply(AbstractResponse response) {
                this.succeed$1.elem = response.errorCounts().isEmpty() || response.errorCounts().containsKey(Errors.NONE) && response.errorCounts().size() == 1;
            }
            {
                this.succeed$1 = succeed$1;
            }
        });
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, succeed){
            public static final long serialVersionUID = 0L;
            private final VolatileBooleanRef succeed$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return this.succeed$1.elem;
            }
            {
                this.succeed$1 = succeed$1;
            }
        }, (Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Should receive response with no errors";
            }
        }, TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
    }

    public BrokerEpochIntegrationTest() {
        this.brokerId1 = 0;
        this.brokerId2 = 1;
    }
}

