/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.BrokerToControllerQueueItem;
import kafka.server.BrokerToControllerRequestThread;
import kafka.server.ControllerInformation;
import kafka.server.ControllerNodeProvider;
import kafka.server.ControllerRequestCompletionHandler;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import kafka.utils.TestUtils$TestControllerRequestCompletionHandler$;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.EnvelopeRequest;
import org.apache.kafka.common.requests.EnvelopeResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001]4Aa\u0004\t\u0001+!)A\u0004\u0001C\u0001;!)\u0001\u0005\u0001C\u0005C!)Q\u0007\u0001C\u0005m!)q\u0007\u0001C\u0001q!)q\t\u0001C\u0001q!)\u0011\n\u0001C\u0001q!)1\n\u0001C\u0001q!)Q\n\u0001C\u0001q!)q\n\u0001C\u0001q!)\u0011\u000b\u0001C\u0001q!)1\u000b\u0001C\u0001q!)Q\u000b\u0001C\u0001q!)q\u000b\u0001C\u00051\"91\u000eAI\u0001\n\u0013a'a\t\"s_.,'\u000fV8D_:$(o\u001c7mKJ\u0014V-];fgR$\u0006N]3bIR+7\u000f\u001e\u0006\u0003#I\taa]3sm\u0016\u0014(\"A\n\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001A\u0006\t\u0003/ii\u0011\u0001\u0007\u0006\u00023\u0005)1oY1mC&\u00111\u0004\u0007\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005q\u0002CA\u0010\u0001\u001b\u0005\u0001\u0012AD2p]R\u0014x\u000e\u001c7fe&sgm\u001c\u000b\u0003E\u0015\u0002\"aH\u0012\n\u0005\u0011\u0002\"!F\"p]R\u0014x\u000e\u001c7fe&sgm\u001c:nCRLwN\u001c\u0005\u0006M\t\u0001\raJ\u0001\u0005]>$W\rE\u0002\u0018Q)J!!\u000b\r\u0003\r=\u0003H/[8o!\tY3'D\u0001-\u0015\tic&\u0001\u0004d_6lwN\u001c\u0006\u0003'=R!\u0001M\u0019\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0011\u0014aA8sO&\u0011A\u0007\f\u0002\u0005\u001d>$W-A\nf[B$\u0018pQ8oiJ|G\u000e\\3s\u0013:4w.F\u0001#\u0003-\"Xm\u001d;SKR\u0014\u0018\u0010V5nK>,Ho\u00165jY\u0016\u001cuN\u001c;s_2dWM\u001d(pi\u00063\u0018-\u001b7bE2,G#A\u001d\u0011\u0005]Q\u0014BA\u001e\u0019\u0005\u0011)f.\u001b;)\u0005\u0011i\u0004C\u0001 F\u001b\u0005y$B\u0001!B\u0003\r\t\u0007/\u001b\u0006\u0003\u0005\u000e\u000bqA[;qSR,'O\u0003\u0002Ec\u0005)!.\u001e8ji&\u0011ai\u0010\u0002\u0005)\u0016\u001cH/\u0001\tuKN$(+Z9vKN$8oU3oi\"\u0012Q!P\u0001\u0016i\u0016\u001cHoQ8oiJ|G\u000e\\3s\u0007\"\fgnZ3eQ\t1Q(A\tuKN$hj\u001c;D_:$(o\u001c7mKJD#aB\u001f\u0002UQ,7\u000f^#om\u0016dw\u000e]3SKN\u0004xN\\:f/&$\bNT8u\u0007>tGO]8mY\u0016\u0014XI\u001d:pe\"\u0012\u0001\"P\u0001\u0011i\u0016\u001cHOU3uef$\u0016.\\3pkRD#!C\u001f\u0002=Q,7\u000f^+ogV\u0004\bo\u001c:uK\u00124VM]:j_:D\u0015M\u001c3mS:<\u0007F\u0001\u0006>\u0003\r\"Xm\u001d;BkRDWM\u001c;jG\u0006$\u0018n\u001c8Fq\u000e,\u0007\u000f^5p]\"\u000bg\u000e\u001a7j]\u001eD#aC\u001f\u0002)Q,7\u000f\u001e+ie\u0016\fGMT8u'R\f'\u000f^3eQ\taQ(A\u0005q_2dWK\u001c;jYR!\u0011(\u00170g\u0011\u0015QV\u00021\u0001\\\u00035\u0011X-];fgR$\u0006N]3bIB\u0011q\u0004X\u0005\u0003;B\u0011qD\u0011:pW\u0016\u0014Hk\\\"p]R\u0014x\u000e\u001c7feJ+\u0017/^3tiRC'/Z1e\u0011\u0015yV\u00021\u0001a\u0003%\u0019wN\u001c3ji&|g\u000eE\u0002\u0018C\u000eL!A\u0019\r\u0003\u0013\u0019+hn\u0019;j_:\u0004\u0004CA\fe\u0013\t)\u0007DA\u0004C_>dW-\u00198\t\u000f\u001dl\u0001\u0013!a\u0001Q\u0006QQ.\u0019=SKR\u0014\u0018.Z:\u0011\u0005]I\u0017B\u00016\u0019\u0005\rIe\u000e^\u0001\u0014a>dG.\u00168uS2$C-\u001a4bk2$HeM\u000b\u0002[*\u0012\u0001N\\\u0016\u0002_B\u0011\u0001/^\u0007\u0002c*\u0011!o]\u0001\nk:\u001c\u0007.Z2lK\u0012T!\u0001\u001e\r\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0002wc\n\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3")
public class BrokerToControllerRequestThreadTest {
    private ControllerInformation controllerInfo(Option<Node> node) {
        return new ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", true);
    }

    private ControllerInformation emptyControllerInfo() {
        return this.controllerInfo((Option<Node>)None$.MODULE$);
    }

    @Test
    public void testRetryTimeoutWhileControllerNotAvailable() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Mockito.when((Object)controllerNodeProvider.getControllerInfo()).thenReturn((Object)this.emptyControllerInfo());
        int retryTimeoutMs = 30000;
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, true, (Function1 & Serializable & scala.Serializable)x$1 -> mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", (long)retryTimeoutMs);
        testRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler completionHandler = new TestUtils.TestControllerRequestCompletionHandler((Option<AbstractResponse>)None$.MODULE$);
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        testRequestThread.doWork();
        Assertions.assertEquals((int)1, (int)testRequestThread.queueSize());
        time.sleep((long)retryTimeoutMs);
        testRequestThread.doWork();
        Assertions.assertEquals((int)0, (int)testRequestThread.queueSize());
        Assertions.assertTrue((boolean)completionHandler.timedOut().get());
    }

    @Test
    public void testRequestsSent() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        int controllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Node activeController = new Node(controllerId, "host", 1234);
        Mockito.when((Object)controllerNodeProvider.getControllerInfo()).thenReturn((Object)this.controllerInfo((Option<Node>)new Some((Object)activeController)));
        MetadataResponse expectedResponse = RequestTestUtils.metadataUpdateWith((int)2, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, true, (Function1 & Serializable & scala.Serializable)x$2 -> mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        TestUtils.TestControllerRequestCompletionHandler completionHandler = new TestUtils.TestControllerRequestCompletionHandler((Option<AbstractResponse>)new Some((Object)expectedResponse));
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        Assertions.assertEquals((int)1, (int)testRequestThread.queueSize());
        testRequestThread.doWork();
        testRequestThread.doWork();
        Assertions.assertEquals((int)0, (int)testRequestThread.queueSize());
        Assertions.assertTrue((boolean)completionHandler.completed().get());
    }

    @Test
    public void testControllerChanged() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        int oldControllerId = 1;
        int newControllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Node oldController = new Node(oldControllerId, "host1", 1234);
        Node newController = new Node(newControllerId, "host2", 1234);
        Mockito.when((Object)controllerNodeProvider.getControllerInfo()).thenReturn((Object)this.controllerInfo((Option<Node>)new Some((Object)oldController)), (Object[])new ControllerInformation[]{this.controllerInfo((Option<Node>)new Some((Object)newController))});
        MetadataResponse expectedResponse = RequestTestUtils.metadataUpdateWith((int)3, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, true, (Function1 & Serializable & scala.Serializable)x$3 -> mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler completionHandler = new TestUtils.TestControllerRequestCompletionHandler((Option<AbstractResponse>)new Some((Object)expectedResponse));
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        testRequestThread.doWork();
        Assertions.assertFalse((boolean)completionHandler.completed().get());
        mockClient.setUnreachable(oldController, time.milliseconds() + 5000L);
        testRequestThread.doWork();
        testRequestThread.doWork();
        testRequestThread.doWork();
        Assertions.assertTrue((boolean)completionHandler.completed().get());
    }

    @Test
    public void testNotController() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        int oldControllerId = 1;
        int newControllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        int port = 1234;
        Node oldController = new Node(oldControllerId, "host1", port);
        Node newController = new Node(newControllerId, "host2", port);
        Mockito.when((Object)controllerNodeProvider.getControllerInfo()).thenReturn((Object)this.controllerInfo((Option<Node>)new Some((Object)oldController)), (Object[])new ControllerInformation[]{this.controllerInfo((Option<Node>)new Some((Object)newController))});
        MetadataResponse responseWithNotControllerError = RequestTestUtils.metadataUpdateWith((String)"cluster1", (int)2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        MetadataResponse expectedResponse = RequestTestUtils.metadataUpdateWith((int)3, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, true, (Function1 & Serializable & scala.Serializable)x$4 -> mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler completionHandler = new TestUtils.TestControllerRequestCompletionHandler((Option<AbstractResponse>)new Some((Object)expectedResponse));
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        testRequestThread.doWork();
        Node oldBrokerNode = new Node(oldControllerId, "host1", port);
        Assertions.assertEquals((Object)new Some((Object)oldBrokerNode), (Object)testRequestThread.activeControllerAddress());
        mockClient.prepareResponse(body -> body instanceof MetadataRequest && ((MetadataRequest)body).allowAutoTopicCreation(), (AbstractResponse)responseWithNotControllerError);
        testRequestThread.doWork();
        Assertions.assertEquals((Object)None$.MODULE$, (Object)testRequestThread.activeControllerAddress());
        testRequestThread.doWork();
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        testRequestThread.doWork();
        Node newControllerNode = new Node(newControllerId, "host2", port);
        Assertions.assertEquals((Object)new Some((Object)newControllerNode), (Object)testRequestThread.activeControllerAddress());
        Assertions.assertTrue((boolean)completionHandler.completed().get());
    }

    @Test
    public void testEnvelopeResponseWithNotControllerError() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        int oldControllerId = 1;
        int newControllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        mockClient.setNodeApiVersions(NodeApiVersions.create((short)ApiKeys.ENVELOPE.id, (short)((short)0), (short)((short)0)));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        int port = 1234;
        Node oldController = new Node(oldControllerId, "host1", port);
        Node newController = new Node(newControllerId, "host2", port);
        Mockito.when((Object)controllerNodeProvider.getControllerInfo()).thenReturn((Object)this.controllerInfo((Option<Node>)new Some((Object)oldController)), (Object[])new ControllerInformation[]{this.controllerInfo((Option<Node>)new Some((Object)newController))});
        EnvelopeResponse envelopeResponseWithNotControllerError = new EnvelopeResponse(new EnvelopeResponseData().setErrorCode(Errors.NOT_CONTROLLER.code()));
        MetadataResponse expectedResponse = RequestTestUtils.metadataUpdateWith((int)3, Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, true, (Function1 & Serializable & scala.Serializable)x$5 -> mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler completionHandler = new TestUtils.TestControllerRequestCompletionHandler((Option<AbstractResponse>)new Some((Object)expectedResponse));
        KafkaPrincipal kafkaPrincipal = new KafkaPrincipal("User", "principal", true);
        DefaultKafkaPrincipalBuilder kafkaPrincipalBuilder = new DefaultKafkaPrincipalBuilder(null, null);
        EnvelopeRequest.Builder envelopeRequestBuilder = new EnvelopeRequest.Builder(ByteBuffer.allocate(0), kafkaPrincipalBuilder.serialize(kafkaPrincipal), "client-address".getBytes());
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)envelopeRequestBuilder, (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        testRequestThread.doWork();
        Node oldBrokerNode = new Node(oldControllerId, "host1", port);
        Assertions.assertEquals((Object)new Some((Object)oldBrokerNode), (Object)testRequestThread.activeControllerAddress());
        mockClient.prepareResponse(body -> body instanceof EnvelopeRequest, (AbstractResponse)envelopeResponseWithNotControllerError);
        testRequestThread.doWork();
        Assertions.assertEquals((Object)None$.MODULE$, (Object)testRequestThread.activeControllerAddress());
        testRequestThread.doWork();
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        testRequestThread.doWork();
        Node newControllerNode = new Node(newControllerId, "host2", port);
        Assertions.assertEquals((Object)new Some((Object)newControllerNode), (Object)testRequestThread.activeControllerAddress());
        Assertions.assertTrue((boolean)completionHandler.completed().get());
    }

    @Test
    public void testRetryTimeout() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        int controllerId = 1;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Node controller = new Node(controllerId, "host1", 1234);
        Mockito.when((Object)controllerNodeProvider.getControllerInfo()).thenReturn((Object)this.controllerInfo((Option<Node>)new Some((Object)controller)));
        int retryTimeoutMs = 30000;
        MetadataResponse responseWithNotControllerError = RequestTestUtils.metadataUpdateWith((String)"cluster1", (int)2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, true, (Function1 & Serializable & scala.Serializable)x$6 -> mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", (long)retryTimeoutMs);
        testRequestThread.started_$eq(true);
        TestUtils.TestControllerRequestCompletionHandler completionHandler = new TestUtils.TestControllerRequestCompletionHandler(TestUtils$TestControllerRequestCompletionHandler$.MODULE$.$lessinit$greater$default$1());
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        testRequestThread.doWork();
        time.sleep((long)retryTimeoutMs);
        mockClient.prepareResponse(body -> body instanceof MetadataRequest && ((MetadataRequest)body).allowAutoTopicCreation(), (AbstractResponse)responseWithNotControllerError);
        testRequestThread.doWork();
        Assertions.assertTrue((boolean)completionHandler.timedOut().get());
    }

    @Test
    public void testUnsupportedVersionHandling() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        int controllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Node activeController = new Node(controllerId, "host", 1234);
        Mockito.when((Object)controllerNodeProvider.getControllerInfo()).thenReturn((Object)this.controllerInfo((Option<Node>)new Some((Object)activeController)));
        AtomicReference callbackResponse = new AtomicReference();
        ControllerRequestCompletionHandler completionHandler = new ControllerRequestCompletionHandler(null, callbackResponse){
            private final AtomicReference callbackResponse$1;

            public void onTimeout() {
                Assertions.fail((String)"Unexpected timeout exception");
            }

            public void onComplete(ClientResponse response) {
                this.callbackResponse$1.set(response);
            }
            {
                this.callbackResponse$1 = callbackResponse$1;
            }
        };
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), completionHandler);
        mockClient.prepareUnsupportedVersionResponse(request -> {
            ApiKeys apiKeys = request.apiKey();
            ApiKeys apiKeys2 = ApiKeys.METADATA;
            return !(apiKeys != null ? !apiKeys.equals(apiKeys2) : apiKeys2 != null);
        });
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, true, (Function1 & Serializable & scala.Serializable)x$7 -> mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        testRequestThread.enqueue(queueItem);
        int pollUntil_maxRetries = this.pollUntil$default$3();
        int pollUntil_tries = 0;
        do {
            testRequestThread.doWork();
        } while (!BrokerToControllerRequestThreadTest.$anonfun$testUnsupportedVersionHandling$3(callbackResponse) && ++pollUntil_tries < pollUntil_maxRetries);
        if (!BrokerToControllerRequestThreadTest.$anonfun$testUnsupportedVersionHandling$3(callbackResponse)) {
            Assertions.fail((String)new StringBuilder(47).append("Condition failed to be met after polling ").append(pollUntil_tries).append(" times").toString());
        }
        Assertions.assertNotNull((Object)((ClientResponse)callbackResponse.get()).versionMismatch());
    }

    @Test
    public void testAuthenticationExceptionHandling() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        int controllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Node activeController = new Node(controllerId, "host", 1234);
        Mockito.when((Object)controllerNodeProvider.getControllerInfo()).thenReturn((Object)this.controllerInfo((Option<Node>)new Some((Object)activeController)));
        AtomicReference callbackResponse = new AtomicReference();
        ControllerRequestCompletionHandler completionHandler = new ControllerRequestCompletionHandler(null, callbackResponse){
            private final AtomicReference callbackResponse$2;

            public void onTimeout() {
                Assertions.fail((String)"Unexpected timeout exception");
            }

            public void onComplete(ClientResponse response) {
                this.callbackResponse$2.set(response);
            }
            {
                this.callbackResponse$2 = callbackResponse$2;
            }
        };
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), completionHandler);
        mockClient.createPendingAuthenticationError(activeController, 50L);
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, true, (Function1 & Serializable & scala.Serializable)x$8 -> mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        testRequestThread.enqueue(queueItem);
        int pollUntil_maxRetries = this.pollUntil$default$3();
        int pollUntil_tries = 0;
        do {
            testRequestThread.doWork();
        } while (!BrokerToControllerRequestThreadTest.$anonfun$testAuthenticationExceptionHandling$2(callbackResponse) && ++pollUntil_tries < pollUntil_maxRetries);
        if (!BrokerToControllerRequestThreadTest.$anonfun$testAuthenticationExceptionHandling$2(callbackResponse)) {
            Assertions.fail((String)new StringBuilder(47).append("Condition failed to be met after polling ").append(pollUntil_tries).append(" times").toString());
        }
        Assertions.assertNotNull((Object)((ClientResponse)callbackResponse.get()).authenticationException());
    }

    @Test
    public void testThreadNotStarted() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Mockito.when((Object)controllerNodeProvider.getControllerInfo()).thenReturn((Object)this.emptyControllerInfo());
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, true, (Function1 & Serializable & scala.Serializable)x$9 -> mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        TestUtils.TestControllerRequestCompletionHandler completionHandler = new TestUtils.TestControllerRequestCompletionHandler((Option<AbstractResponse>)None$.MODULE$);
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), (ControllerRequestCompletionHandler)completionHandler);
        Assertions.assertThrows(IllegalStateException.class, () -> testRequestThread.enqueue(queueItem));
        Assertions.assertEquals((int)0, (int)testRequestThread.queueSize());
    }

    private void pollUntil(BrokerToControllerRequestThread requestThread, Function0<Object> condition, int maxRetries) {
        int tries = 0;
        do {
            requestThread.doWork();
        } while (!condition.apply$mcZ$sp() && ++tries < maxRetries);
        if (!condition.apply$mcZ$sp()) {
            Assertions.fail((String)new StringBuilder(47).append("Condition failed to be met after polling ").append(tries).append(" times").toString());
        }
    }

    private int pollUntil$default$3() {
        return 10;
    }

    public static final /* synthetic */ boolean $anonfun$testUnsupportedVersionHandling$3(AtomicReference callbackResponse$1) {
        return callbackResponse$1.get() != null;
    }

    public static final /* synthetic */ boolean $anonfun$testAuthenticationExceptionHandling$2(AtomicReference callbackResponse$2) {
        return callbackResponse$2.get() != null;
    }
}

