/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.File;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.BrokerToControllerQueueItem;
import kafka.server.BrokerToControllerRequestThread;
import kafka.server.BrokerToControllerRequestThreadTest$TestRequestCompletionHandler$;
import kafka.server.ControllerNodeProvider;
import kafka.server.ControllerRequestCompletionHandler;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.EnvelopeRequest;
import org.apache.kafka.common.requests.EnvelopeResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0005\u0005Ec\u0001\u0002\u000e\u001c\u0001\u0001BQa\n\u0001\u0005\u0002!BQa\u000b\u0001\u0005\u00021BQ!\u0010\u0001\u0005\u00021BQa\u0010\u0001\u0005\u00021BQ!\u0011\u0001\u0005\u00021BQa\u0011\u0001\u0005\u00021BQ!\u0012\u0001\u0005\u00021BQa\u0012\u0001\u0005\u00021BQ!\u0013\u0001\u0005\u00021BQa\u0013\u0001\u0005\u00021BQ!\u0014\u0001\u0005\n9Cq!\u0019\u0001\u0012\u0002\u0013%!M\u0002\u0003n\u0001\u0001q\u0007\u0002\u0003:\u000e\u0005\u0003\u0005\u000b\u0011B:\t\r\u001djA\u0011AA\u0002\u0011%\tY!\u0004b\u0001\n\u0003\ti\u0001\u0003\u0005\u0002(5\u0001\u000b\u0011BA\b\u0011%\tI#\u0004b\u0001\n\u0003\ti\u0001\u0003\u0005\u0002,5\u0001\u000b\u0011BA\b\u0011\u001d\ti#\u0004C!\u0003_Aa!!\u0011\u000e\t\u0003bs!CA\"\u0001\u0005\u0005\t\u0012AA#\r!i\u0007!!A\t\u0002\u0005\u001d\u0003BB\u0014\u0018\t\u0003\tI\u0005C\u0005\u0002L]\t\n\u0011\"\u0001\u0002N\t\u0019#I]8lKJ$vnQ8oiJ|G\u000e\\3s%\u0016\fX/Z:u)\"\u0014X-\u00193UKN$(B\u0001\u000f\u001e\u0003\u0019\u0019XM\u001d<fe*\ta$A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001\t\u0003C\u0001\u0012&\u001b\u0005\u0019#\"\u0001\u0013\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0019\u001a#AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002SA\u0011!\u0006A\u0007\u00027\u0005YC/Z:u%\u0016$(/\u001f+j[\u0016|W\u000f^,iS2,7i\u001c8ue>dG.\u001a:O_R\fe/Y5mC\ndW\rF\u0001.!\t\u0011c&\u0003\u00020G\t!QK\\5uQ\t\u0011\u0011\u0007\u0005\u00023w5\t1G\u0003\u00025k\u0005\u0019\u0011\r]5\u000b\u0005Y:\u0014a\u00026va&$XM\u001d\u0006\u0003qe\nQA[;oSRT\u0011AO\u0001\u0004_J<\u0017B\u0001\u001f4\u0005\u0011!Vm\u001d;\u0002!Q,7\u000f\u001e*fcV,7\u000f^:TK:$\bFA\u00022\u0003U!Xm\u001d;D_:$(o\u001c7mKJ\u001c\u0005.\u00198hK\u0012D#\u0001B\u0019\u0002#Q,7\u000f\u001e(pi\u000e{g\u000e\u001e:pY2,'\u000f\u000b\u0002\u0006c\u0005QC/Z:u\u000b:4X\r\\8qKJ+7\u000f]8og\u0016<\u0016\u000e\u001e5O_R\u001cuN\u001c;s_2dWM]#se>\u0014\bF\u0001\u00042\u0003A!Xm\u001d;SKR\u0014\u0018\u0010V5nK>,H\u000f\u000b\u0002\bc\u0005qB/Z:u+:\u001cX\u000f\u001d9peR,GMV3sg&|g\u000eS1oI2Lgn\u001a\u0015\u0003\u0011E\n1\u0005^3ti\u0006+H\u000f[3oi&\u001c\u0017\r^5p]\u0016C8-\u001a9uS>t\u0007*\u00198eY&tw\r\u000b\u0002\nc\u0005!B/Z:u)\"\u0014X-\u00193O_R\u001cF/\u0019:uK\u0012D#AC\u0019\u0002\u0013A|G\u000e\\+oi&dG\u0003B\u0017P)rCQ\u0001U\u0006A\u0002E\u000bQB]3rk\u0016\u001cH\u000f\u00165sK\u0006$\u0007C\u0001\u0016S\u0013\t\u00196DA\u0010Ce>\\WM\u001d+p\u0007>tGO]8mY\u0016\u0014(+Z9vKN$H\u000b\u001b:fC\u0012DQ!V\u0006A\u0002Y\u000b\u0011bY8oI&$\u0018n\u001c8\u0011\u0007\t:\u0016,\u0003\u0002YG\tIa)\u001e8di&|g\u000e\r\t\u0003EiK!aW\u0012\u0003\u000f\t{w\u000e\\3b]\"9Ql\u0003I\u0001\u0002\u0004q\u0016AC7bqJ+GO]5fgB\u0011!eX\u0005\u0003A\u000e\u00121!\u00138u\u0003M\u0001x\u000e\u001c7V]RLG\u000e\n3fM\u0006,H\u000e\u001e\u00134+\u0005\u0019'F\u00010eW\u0005)\u0007C\u00014l\u001b\u00059'B\u00015j\u0003%)hn\u00195fG.,GM\u0003\u0002kG\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\u00051<'!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\naB+Z:u%\u0016\fX/Z:u\u0007>l\u0007\u000f\\3uS>t\u0007*\u00198eY\u0016\u00148CA\u0007p!\tQ\u0003/\u0003\u0002r7\t\u00113i\u001c8ue>dG.\u001a:SKF,Xm\u001d;D_6\u0004H.\u001a;j_:D\u0015M\u001c3mKJ\f\u0001#\u001a=qK\u000e$X\r\u001a*fgB|gn]3\u0011\u0007\t\"h/\u0003\u0002vG\t1q\n\u001d;j_:\u0004\"a^@\u000e\u0003aT!!\u001f>\u0002\u0011I,\u0017/^3tiNT!a\u001f?\u0002\r\r|W.\\8o\u0015\tqRP\u0003\u0002\u007fs\u00051\u0011\r]1dQ\u0016L1!!\u0001y\u0005AiU\r^1eCR\f'+Z:q_:\u001cX\r\u0006\u0003\u0002\u0006\u0005%\u0001cAA\u0004\u001b5\t\u0001\u0001C\u0004s\u001fA\u0005\t\u0019A:\u0002\u0013\r|W\u000e\u001d7fi\u0016$WCAA\b!\u0011\t\t\"a\t\u000e\u0005\u0005M!\u0002BA\u000b\u0003/\ta!\u0019;p[&\u001c'\u0002BA\r\u00037\t!bY8oGV\u0014(/\u001a8u\u0015\u0011\ti\"a\b\u0002\tU$\u0018\u000e\u001c\u0006\u0003\u0003C\tAA[1wC&!\u0011QEA\n\u00055\tEo\\7jG\n{w\u000e\\3b]\u0006Q1m\\7qY\u0016$X\r\u001a\u0011\u0002\u0011QLW.\u001a3PkR\f\u0011\u0002^5nK\u0012|U\u000f\u001e\u0011\u0002\u0015=t7i\\7qY\u0016$X\rF\u0002.\u0003cAq!a\r\u0015\u0001\u0004\t)$\u0001\u0005sKN\u0004xN\\:f!\u0011\t9$!\u0010\u000e\u0005\u0005e\"bAA\u001ey\u000691\r\\5f]R\u001c\u0018\u0002BA \u0003s\u0011ab\u00117jK:$(+Z:q_:\u001cX-A\u0005p]RKW.Z8vi\u0006aB+Z:u%\u0016\fX/Z:u\u0007>l\u0007\u000f\\3uS>t\u0007*\u00198eY\u0016\u0014\bcAA\u0004/M\u0011q#\t\u000b\u0003\u0003\u000b\n1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012\nTCAA(U\t\u0019H\r")
public class BrokerToControllerRequestThreadTest {
    private volatile BrokerToControllerRequestThreadTest$TestRequestCompletionHandler$ TestRequestCompletionHandler$module;

    public BrokerToControllerRequestThreadTest$TestRequestCompletionHandler$ TestRequestCompletionHandler() {
        if (this.TestRequestCompletionHandler$module == null) {
            this.TestRequestCompletionHandler$lzycompute$1();
        }
        return this.TestRequestCompletionHandler$module;
    }

    @Test
    public void testRetryTimeoutWhileControllerNotAvailable() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Mockito.when((Object)controllerNodeProvider.get()).thenReturn((Object)None$.MODULE$);
        int retryTimeoutMs = 30000;
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", (long)retryTimeoutMs);
        testRequestThread.started_$eq(true);
        TestRequestCompletionHandler completionHandler = new TestRequestCompletionHandler(this, (Option<MetadataResponse>)None$.MODULE$);
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        testRequestThread.doWork();
        Assertions.assertEquals((int)1, (int)testRequestThread.queueSize());
        time.sleep((long)retryTimeoutMs);
        testRequestThread.doWork();
        Assertions.assertEquals((int)0, (int)testRequestThread.queueSize());
        Assertions.assertTrue((boolean)completionHandler.timedOut().get());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testRequestsSent() {
        void metadataUpdateWith_topicPartitionCounts;
        MetadataResponse metadataResponse;
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        int controllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        Node activeController = new Node(controllerId, "host", 1234);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Mockito.when((Object)controllerNodeProvider.get()).thenReturn((Object)new Some((Object)activeController));
        Map<String, Integer> map = Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2));
        int metadataUpdateWith_numNodes = 2;
        MetadataResponse metadataResponse2 = metadataResponse = RequestTestUtils.metadataUpdateWith((String)"kafka-cluster", (int)metadataUpdateWith_numNodes, (Map)metadataUpdateWith_topicPartitionCounts);
        map = null;
        metadataResponse = null;
        MetadataResponse expectedResponse = metadataResponse2;
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        TestRequestCompletionHandler completionHandler = new TestRequestCompletionHandler(this, (Option<MetadataResponse>)new Some((Object)expectedResponse));
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        Assertions.assertEquals((int)1, (int)testRequestThread.queueSize());
        testRequestThread.doWork();
        testRequestThread.doWork();
        Assertions.assertEquals((int)0, (int)testRequestThread.queueSize());
        Assertions.assertTrue((boolean)completionHandler.completed().get());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testControllerChanged() {
        void metadataUpdateWith_topicPartitionCounts;
        MetadataResponse metadataResponse;
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        int oldControllerId = 1;
        int newControllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Node oldController = new Node(oldControllerId, "host1", 1234);
        Node newController = new Node(newControllerId, "host2", 1234);
        Mockito.when((Object)controllerNodeProvider.get()).thenReturn((Object)new Some((Object)oldController), (Object[])new Option[]{new Some((Object)newController)});
        Map<String, Integer> map = Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2));
        int metadataUpdateWith_numNodes = 3;
        MetadataResponse metadataResponse2 = metadataResponse = RequestTestUtils.metadataUpdateWith((String)"kafka-cluster", (int)metadataUpdateWith_numNodes, (Map)metadataUpdateWith_topicPartitionCounts);
        map = null;
        metadataResponse = null;
        MetadataResponse expectedResponse = metadataResponse2;
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        TestRequestCompletionHandler completionHandler = new TestRequestCompletionHandler(this, (Option<MetadataResponse>)new Some((Object)expectedResponse));
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        testRequestThread.doWork();
        Assertions.assertFalse((boolean)completionHandler.completed().get());
        mockClient.setUnreachable(oldController, time.milliseconds() + 5000L);
        testRequestThread.doWork();
        testRequestThread.doWork();
        testRequestThread.doWork();
        Assertions.assertTrue((boolean)completionHandler.completed().get());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testNotController() {
        void metadataUpdateWith_topicPartitionCounts;
        MetadataResponse metadataResponse;
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        int oldControllerId = 1;
        int newControllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        int port = 1234;
        Node oldController = new Node(oldControllerId, "host1", port);
        Node newController = new Node(newControllerId, "host2", port);
        Mockito.when((Object)controllerNodeProvider.get()).thenReturn((Object)new Some((Object)oldController), (Object[])new Option[]{new Some((Object)newController)});
        MetadataResponse responseWithNotControllerError = RequestTestUtils.metadataUpdateWith((String)"cluster1", (int)2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        Map<String, Integer> map = Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2));
        int metadataUpdateWith_numNodes = 3;
        MetadataResponse metadataResponse2 = metadataResponse = RequestTestUtils.metadataUpdateWith((String)"kafka-cluster", (int)metadataUpdateWith_numNodes, (Map)metadataUpdateWith_topicPartitionCounts);
        map = null;
        metadataResponse = null;
        MetadataResponse expectedResponse = metadataResponse2;
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        TestRequestCompletionHandler completionHandler = new TestRequestCompletionHandler(this, (Option<MetadataResponse>)new Some((Object)expectedResponse));
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        testRequestThread.doWork();
        Node oldBrokerNode = new Node(oldControllerId, "host1", port);
        Assertions.assertEquals((Object)new Some((Object)oldBrokerNode), (Object)testRequestThread.activeControllerAddress());
        mockClient.prepareResponse(body -> body instanceof MetadataRequest && ((MetadataRequest)body).allowAutoTopicCreation(), (AbstractResponse)responseWithNotControllerError);
        testRequestThread.doWork();
        Assertions.assertEquals((Object)None$.MODULE$, (Object)testRequestThread.activeControllerAddress());
        testRequestThread.doWork();
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        testRequestThread.doWork();
        Node newControllerNode = new Node(newControllerId, "host2", port);
        Assertions.assertEquals((Object)new Some((Object)newControllerNode), (Object)testRequestThread.activeControllerAddress());
        Assertions.assertTrue((boolean)completionHandler.completed().get());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testEnvelopeResponseWithNotControllerError() {
        void metadataUpdateWith_topicPartitionCounts;
        MetadataResponse metadataResponse;
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        int oldControllerId = 1;
        int newControllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        mockClient.setNodeApiVersions(NodeApiVersions.create((short)ApiKeys.ENVELOPE.id, (short)((short)0), (short)((short)0)));
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        int port = 1234;
        Node oldController = new Node(oldControllerId, "host1", port);
        Node newController = new Node(newControllerId, "host2", port);
        Mockito.when((Object)controllerNodeProvider.get()).thenReturn((Object)new Some((Object)oldController), (Object[])new Option[]{new Some((Object)newController)});
        EnvelopeResponse envelopeResponseWithNotControllerError = new EnvelopeResponse(new EnvelopeResponseData().setErrorCode(Errors.NOT_CONTROLLER.code()));
        Map<String, Integer> map = Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2));
        int metadataUpdateWith_numNodes = 3;
        MetadataResponse metadataResponse2 = metadataResponse = RequestTestUtils.metadataUpdateWith((String)"kafka-cluster", (int)metadataUpdateWith_numNodes, (Map)metadataUpdateWith_topicPartitionCounts);
        map = null;
        metadataResponse = null;
        MetadataResponse expectedResponse = metadataResponse2;
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        TestRequestCompletionHandler completionHandler = new TestRequestCompletionHandler(this, (Option<MetadataResponse>)new Some((Object)expectedResponse));
        KafkaPrincipal kafkaPrincipal = new KafkaPrincipal("User", "principal", true);
        DefaultKafkaPrincipalBuilder kafkaPrincipalBuilder = new DefaultKafkaPrincipalBuilder(null, null);
        EnvelopeRequest.Builder envelopeRequestBuilder = new EnvelopeRequest.Builder(ByteBuffer.allocate(0), kafkaPrincipalBuilder.serialize(kafkaPrincipal), "client-address".getBytes());
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)envelopeRequestBuilder, (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        testRequestThread.doWork();
        Node oldBrokerNode = new Node(oldControllerId, "host1", port);
        Assertions.assertEquals((Object)new Some((Object)oldBrokerNode), (Object)testRequestThread.activeControllerAddress());
        mockClient.prepareResponse(body -> body instanceof EnvelopeRequest, (AbstractResponse)envelopeResponseWithNotControllerError);
        testRequestThread.doWork();
        Assertions.assertEquals((Object)None$.MODULE$, (Object)testRequestThread.activeControllerAddress());
        testRequestThread.doWork();
        mockClient.prepareResponse((AbstractResponse)expectedResponse);
        testRequestThread.doWork();
        Node newControllerNode = new Node(newControllerId, "host2", port);
        Assertions.assertEquals((Object)new Some((Object)newControllerNode), (Object)testRequestThread.activeControllerAddress());
        Assertions.assertTrue((boolean)completionHandler.completed().get());
    }

    @Test
    public void testRetryTimeout() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        int controllerId = 1;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Node controller = new Node(controllerId, "host1", 1234);
        Mockito.when((Object)controllerNodeProvider.get()).thenReturn((Object)new Some((Object)controller));
        int retryTimeoutMs = 30000;
        MetadataResponse responseWithNotControllerError = RequestTestUtils.metadataUpdateWith((String)"cluster1", (int)2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", Predef$.MODULE$.int2Integer(2)));
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", (long)retryTimeoutMs);
        testRequestThread.started_$eq(true);
        if (this.TestRequestCompletionHandler() == null) {
            throw null;
        }
        TestRequestCompletionHandler completionHandler = new TestRequestCompletionHandler(this, (Option<MetadataResponse>)None$.MODULE$);
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData().setAllowAutoTopicCreation(true)), (ControllerRequestCompletionHandler)completionHandler);
        testRequestThread.enqueue(queueItem);
        testRequestThread.doWork();
        time.sleep((long)retryTimeoutMs);
        mockClient.prepareResponse(body -> body instanceof MetadataRequest && ((MetadataRequest)body).allowAutoTopicCreation(), (AbstractResponse)responseWithNotControllerError);
        testRequestThread.doWork();
        Assertions.assertTrue((boolean)completionHandler.timedOut().get());
    }

    @Test
    public void testUnsupportedVersionHandling() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        int controllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Node activeController = new Node(controllerId, "host", 1234);
        Mockito.when((Object)controllerNodeProvider.get()).thenReturn((Object)new Some((Object)activeController));
        AtomicReference callbackResponse = new AtomicReference();
        ControllerRequestCompletionHandler completionHandler = new ControllerRequestCompletionHandler(null, callbackResponse){
            private final AtomicReference callbackResponse$1;

            public void onTimeout() {
                Assertions.fail((String)"Unexpected timeout exception");
            }

            public void onComplete(ClientResponse response) {
                this.callbackResponse$1.set(response);
            }
            {
                this.callbackResponse$1 = callbackResponse$1;
            }
        };
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), completionHandler);
        mockClient.prepareUnsupportedVersionResponse(request -> {
            ApiKeys apiKeys = request.apiKey();
            ApiKeys apiKeys2 = ApiKeys.METADATA;
            return !(apiKeys != null ? !apiKeys.equals(apiKeys2) : apiKeys2 != null);
        });
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        testRequestThread.enqueue(queueItem);
        int n = 10;
        int pollUntil_tries = 0;
        do {
            testRequestThread.doWork();
        } while (!BrokerToControllerRequestThreadTest.$anonfun$testUnsupportedVersionHandling$2(callbackResponse) && ++pollUntil_tries < n);
        if (!BrokerToControllerRequestThreadTest.$anonfun$testUnsupportedVersionHandling$2(callbackResponse)) {
            Assertions.fail((String)new StringBuilder(47).append("Condition failed to be met after polling ").append(pollUntil_tries).append(" times").toString());
        }
        Assertions.assertNotNull((Object)((ClientResponse)callbackResponse.get()).versionMismatch());
    }

    @Test
    public void testAuthenticationExceptionHandling() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        int controllerId = 2;
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        Node activeController = new Node(controllerId, "host", 1234);
        Mockito.when((Object)controllerNodeProvider.get()).thenReturn((Object)new Some((Object)activeController));
        AtomicReference callbackResponse = new AtomicReference();
        ControllerRequestCompletionHandler completionHandler = new ControllerRequestCompletionHandler(null, callbackResponse){
            private final AtomicReference callbackResponse$2;

            public void onTimeout() {
                Assertions.fail((String)"Unexpected timeout exception");
            }

            public void onComplete(ClientResponse response) {
                this.callbackResponse$2.set(response);
            }
            {
                this.callbackResponse$2 = callbackResponse$2;
            }
        };
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), completionHandler);
        mockClient.createPendingAuthenticationError(activeController, 50L);
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        testRequestThread.started_$eq(true);
        testRequestThread.enqueue(queueItem);
        int n = 10;
        int pollUntil_tries = 0;
        do {
            testRequestThread.doWork();
        } while (!BrokerToControllerRequestThreadTest.$anonfun$testAuthenticationExceptionHandling$1(callbackResponse) && ++pollUntil_tries < n);
        if (!BrokerToControllerRequestThreadTest.$anonfun$testAuthenticationExceptionHandling$1(callbackResponse)) {
            Assertions.fail((String)new StringBuilder(47).append("Condition failed to be met after polling ").append(pollUntil_tries).append(" times").toString());
        }
        Assertions.assertNotNull((Object)((ClientResponse)callbackResponse.get()).authenticationException());
    }

    @Test
    public void testThreadNotStarted() {
        MockTime time = new MockTime();
        KafkaConfig config = new KafkaConfig((Map)TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockClient mockClient = new MockClient((Time)time, metadata);
        ControllerNodeProvider controllerNodeProvider = (ControllerNodeProvider)Mockito.mock(ControllerNodeProvider.class);
        BrokerToControllerRequestThread testRequestThread = new BrokerToControllerRequestThread((KafkaClient)mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, (Time)time, "", Long.MAX_VALUE);
        TestRequestCompletionHandler completionHandler = new TestRequestCompletionHandler(this, (Option<MetadataResponse>)None$.MODULE$);
        BrokerToControllerQueueItem queueItem = new BrokerToControllerQueueItem(time.milliseconds(), (AbstractRequest.Builder)new MetadataRequest.Builder(new MetadataRequestData()), (ControllerRequestCompletionHandler)completionHandler);
        Assertions.assertThrows(IllegalStateException.class, () -> testRequestThread.enqueue(queueItem));
        Assertions.assertEquals((int)0, (int)testRequestThread.queueSize());
    }

    private void pollUntil(BrokerToControllerRequestThread requestThread, Function0<Object> condition, int maxRetries) {
        int tries = 0;
        do {
            requestThread.doWork();
        } while (!condition.apply$mcZ$sp() && ++tries < maxRetries);
        if (!condition.apply$mcZ$sp()) {
            Assertions.fail((String)new StringBuilder(47).append("Condition failed to be met after polling ").append(tries).append(" times").toString());
        }
    }

    private int pollUntil$default$3() {
        return 10;
    }

    private final void TestRequestCompletionHandler$lzycompute$1() {
        synchronized (this) {
            if (this.TestRequestCompletionHandler$module == null) {
                this.TestRequestCompletionHandler$module = new BrokerToControllerRequestThreadTest$TestRequestCompletionHandler$(this);
            }
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$testUnsupportedVersionHandling$2(AtomicReference callbackResponse$1) {
        return callbackResponse$1.get() != null;
    }

    public static final /* synthetic */ boolean $anonfun$testAuthenticationExceptionHandling$1(AtomicReference callbackResponse$2) {
        return callbackResponse$2.get() != null;
    }

    public class TestRequestCompletionHandler
    extends ControllerRequestCompletionHandler {
        private final Option<MetadataResponse> expectedResponse;
        private final AtomicBoolean completed;
        private final AtomicBoolean timedOut;
        public final /* synthetic */ BrokerToControllerRequestThreadTest $outer;

        public AtomicBoolean completed() {
            return this.completed;
        }

        public AtomicBoolean timedOut() {
            return this.timedOut;
        }

        public void onComplete(ClientResponse response) {
            this.expectedResponse.foreach((Function1 & Serializable)expected -> {
                TestRequestCompletionHandler.$anonfun$onComplete$1(response, expected);
                return BoxedUnit.UNIT;
            });
            this.completed().set(true);
        }

        public void onTimeout() {
            this.timedOut().set(true);
        }

        public /* synthetic */ BrokerToControllerRequestThreadTest kafka$server$BrokerToControllerRequestThreadTest$TestRequestCompletionHandler$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ void $anonfun$onComplete$1(ClientResponse response$1, MetadataResponse expected) {
            Assertions.assertEquals((Object)expected, (Object)response$1.responseBody());
        }

        public TestRequestCompletionHandler(BrokerToControllerRequestThreadTest $outer, Option<MetadataResponse> expectedResponse) {
            this.expectedResponse = expectedResponse;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            this.completed = new AtomicBoolean(false);
            this.timedOut = new AtomicBoolean(false);
        }
    }
}

