/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.ControllerInformation;
import kafka.server.ControllerNodeProvider;
import kafka.server.KafkaConfig;
import kafka.server.NodeToControllerQueueItem;
import kafka.utils.Logging;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0001\u0005ef\u0001B\u0010!\u0001\u0015B\u0001\u0002\u000f\u0001\u0003\u0002\u0003\u0006I!\u000f\u0005\t\u007f\u0001\u0011\t\u0019!C\u0001\u0001\"Aq\t\u0001BA\u0002\u0013\u0005\u0001\n\u0003\u0005O\u0001\t\u0005\t\u0015)\u0003B\u0011!y\u0005A!A!\u0002\u0013\u0001\u0006\u0002C,\u0001\u0005\u0003\u0005\u000b\u0011\u0002-\t\u0011m\u0003!\u0011!Q\u0001\nqC\u0001b\u0018\u0001\u0003\u0002\u0003\u0006I\u0001\u0019\u0005\tG\u0002\u0011\t\u0011)A\u0005I\"A1\u000e\u0001B\u0001B\u0003%A\u000e\u0003\u0005x\u0001\t\u0005\t\u0015!\u0003y\u0011\u0015Y\b\u0001\"\u0001}\u0011\u001d\ty\u0001\u0001C\u0005\u0003#A\u0011\"a\u0006\u0001\u0005\u0004%I!!\u0007\t\u0011\u0005M\u0002\u0001)A\u0005\u00037A\u0011\"!\u000e\u0001\u0005\u0004%I!a\u000e\t\u0011\u00055\u0003\u0001)A\u0005\u0003sA\u0011\"a\u0014\u0001\u0001\u0004%\t\u0001\t!\t\u0015\u0005E\u0003\u00011A\u0005\u0002\u0001\n\u0019\u0006C\u0004\u0002X\u0001\u0001\u000b\u0015B!\t\u000f\u0005\u0005\u0004\u0001\"\u0001\u0002d!9\u00111\u000e\u0001\u0005\n\u00055\u0004bBA:\u0001\u0011\u0005\u0011Q\u000f\u0005\b\u0003w\u0002A\u0011AA?\u0011\u001d\t)\t\u0001C!\u0003\u000fC\u0001\"a&\u0001\t\u0003\u0001\u0013\u0011\u0014\u0005\b\u0003W\u0003A\u0011BAW\u0011\u001d\ty\u000b\u0001C!\u0003[Cq!!-\u0001\t\u0003\ni\u000b\u0003\u0007\u00024\u0002\u0001\n\u0011!A\u0001\n\u0003\t)LA\u000fO_\u0012,Gk\\\"p]R\u0014x\u000e\u001c7feJ+\u0017/^3tiRC'/Z1e\u0015\t\t#%\u0001\u0004tKJ4XM\u001d\u0006\u0002G\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001'eA\u0011q\u0005M\u0007\u0002Q)\u0011\u0011FK\u0001\u0005kRLGN\u0003\u0002\"W)\u00111\u0005\f\u0006\u0003[9\na!\u00199bG\",'\"A\u0018\u0002\u0007=\u0014x-\u0003\u00022Q\t)\u0012J\u001c;fe\n\u0013xn[3s'\u0016tG\r\u00165sK\u0006$\u0007CA\u001a7\u001b\u0005!$BA\u001b#\u0003\u0015)H/\u001b7t\u0013\t9DGA\u0004M_\u001e<\u0017N\\4\u0002)%t\u0017\u000e^5bY:+Go^8sW\u000ec\u0017.\u001a8u!\tQT(D\u0001<\u0015\ta4&A\u0004dY&,g\u000e^:\n\u0005yZ$aC&bM.\f7\t\\5f]R\fa$[:OKR<xN]6DY&,g\u000e\u001e$pej[7i\u001c8ue>dG.\u001a:\u0016\u0003\u0005\u0003\"AQ#\u000e\u0003\rS\u0011\u0001R\u0001\u0006g\u000e\fG.Y\u0005\u0003\r\u000e\u0013qAQ8pY\u0016\fg.\u0001\u0012jg:+Go^8sW\u000ec\u0017.\u001a8u\r>\u0014(l[\"p]R\u0014x\u000e\u001c7fe~#S-\u001d\u000b\u0003\u00132\u0003\"A\u0011&\n\u0005-\u001b%\u0001B+oSRDq!T\u0002\u0002\u0002\u0003\u0007\u0011)A\u0002yIE\nq$[:OKR<xN]6DY&,g\u000e\u001e$pej[7i\u001c8ue>dG.\u001a:!\u0003QqW\r^<pe.\u001cE.[3oi\u001a\u000b7\r^8ssB!!)U*:\u0013\t\u00116IA\u0005Gk:\u001cG/[8ocA\u0011A+V\u0007\u0002A%\u0011a\u000b\t\u0002\u0016\u0007>tGO]8mY\u0016\u0014\u0018J\u001c4pe6\fG/[8o\u0003=iW\r^1eCR\fW\u000b\u001d3bi\u0016\u0014\bC\u0001\u001eZ\u0013\tQ6HA\u000bNC:,\u0018\r\\'fi\u0006$\u0017\r^1Va\u0012\fG/\u001a:\u0002-\r|g\u000e\u001e:pY2,'OT8eKB\u0013xN^5eKJ\u0004\"\u0001V/\n\u0005y\u0003#AF\"p]R\u0014x\u000e\u001c7fe:{G-\u001a)s_ZLG-\u001a:\u0002\r\r|gNZ5h!\t!\u0016-\u0003\u0002cA\tY1*\u00194lC\u000e{gNZ5h\u0003\u0011!\u0018.\\3\u0011\u0005\u0015LW\"\u00014\u000b\u0005U:'B\u00015,\u0003\u0019\u0019w.\\7p]&\u0011!N\u001a\u0002\u0005)&lW-\u0001\u0006uQJ,\u0017\r\u001a(b[\u0016\u0004\"!\u001c;\u000f\u00059\u0014\bCA8D\u001b\u0005\u0001(BA9%\u0003\u0019a$o\\8u}%\u00111oQ\u0001\u0007!J,G-\u001a4\n\u0005U4(AB*ue&twM\u0003\u0002t\u0007\u0006q!/\u001a;ssRKW.Z8vi6\u001b\bC\u0001\"z\u0013\tQ8I\u0001\u0003M_:<\u0017A\u0002\u001fj]&$h\bF\t~}~\f\t!a\u0001\u0002\u0006\u0005\u001d\u0011\u0011BA\u0006\u0003\u001b\u0001\"\u0001\u0016\u0001\t\u000bab\u0001\u0019A\u001d\t\u000b}b\u0001\u0019A!\t\u000b=c\u0001\u0019\u0001)\t\u000b]c\u0001\u0019\u0001-\t\u000bmc\u0001\u0019\u0001/\t\u000b}c\u0001\u0019\u00011\t\u000b\rd\u0001\u0019\u00013\t\u000b-d\u0001\u0019\u00017\t\u000b]d\u0001\u0019\u0001=\u0002/5\f\u0017PY3SKN,GOT3uo>\u00148n\u00117jK:$HcA%\u0002\u0014!1\u0011QC\u0007A\u0002M\u000bQcY8oiJ|G\u000e\\3s\u0013:4wN]7bi&|g.\u0001\u0007sKF,Xm\u001d;Rk\u0016,X-\u0006\u0002\u0002\u001cA1\u0011QDA\u0015\u0003[i!!a\b\u000b\t\u0005\u0005\u00121E\u0001\u000bG>t7-\u001e:sK:$(bA\u0015\u0002&)\u0011\u0011qE\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002,\u0005}!a\u0005'j].,GM\u00117pG.Lgn\u001a#fcV,\u0007c\u0001+\u00020%\u0019\u0011\u0011\u0007\u0011\u000339{G-\u001a+p\u0007>tGO]8mY\u0016\u0014\u0018+^3vK&#X-\\\u0001\u000ee\u0016\fX/Z:u#V,W/\u001a\u0011\u0002!\u0005\u001cG/\u001b<f\u0007>tGO]8mY\u0016\u0014XCAA\u001d!\u0019\tY$!\u0011\u0002F5\u0011\u0011Q\b\u0006\u0005\u0003\u007f\ty\"\u0001\u0004bi>l\u0017nY\u0005\u0005\u0003\u0007\niDA\bBi>l\u0017n\u0019*fM\u0016\u0014XM\\2f!\u0011\t9%!\u0013\u000e\u0003\u001dL1!a\u0013h\u0005\u0011qu\u000eZ3\u0002#\u0005\u001cG/\u001b<f\u0007>tGO]8mY\u0016\u0014\b%A\u0004ti\u0006\u0014H/\u001a3\u0002\u0017M$\u0018M\u001d;fI~#S-\u001d\u000b\u0004\u0013\u0006U\u0003bB'\u0014\u0003\u0003\u0005\r!Q\u0001\tgR\f'\u000f^3eA!\u001aA#a\u0017\u0011\u0007\t\u000bi&C\u0002\u0002`\r\u0013\u0001B^8mCRLG.Z\u0001\u0018C\u000e$\u0018N^3D_:$(o\u001c7mKJ\fE\r\u001a:fgN$\"!!\u001a\u0011\u000b\t\u000b9'!\u0012\n\u0007\u0005%4I\u0001\u0004PaRLwN\\\u0001\u0018kB$\u0017\r^3D_:$(o\u001c7mKJ\fE\r\u001a:fgN$2!SA8\u0011\u001d\t\tH\u0006a\u0001\u0003\u000b\n1C\\3x\u0003\u000e$\u0018N^3D_:$(o\u001c7mKJ\fq!\u001a8rk\u0016,X\rF\u0002J\u0003oBq!!\u001f\u0018\u0001\u0004\ti#A\u0004sKF,Xm\u001d;\u0002\u0013E,X-^3TSj,WCAA@!\r\u0011\u0015\u0011Q\u0005\u0004\u0003\u0007\u001b%aA%oi\u0006\u0001r-\u001a8fe\u0006$XMU3rk\u0016\u001cHo\u001d\u000b\u0003\u0003\u0013\u0003b!a#\u0002\u000e\u0006EUBAA\u0012\u0013\u0011\ty)a\t\u0003\u0015\r{G\u000e\\3di&|g\u000eE\u0002(\u0003'K1!!&)\u0005m\u0011V-];fgR\fe\u000eZ\"p[BdW\r^5p]\"\u000bg\u000e\u001a7fe\u0006q\u0001.\u00198eY\u0016\u0014Vm\u001d9p]N,G\u0003BAN\u0003O#2!SAO\u0011\u001d\tyJ\u0007a\u0001\u0003C\u000b\u0001B]3ta>t7/\u001a\t\u0004u\u0005\r\u0016bAASw\tq1\t\\5f]R\u0014Vm\u001d9p]N,\u0007bBAU5\u0001\u0007\u0011QF\u0001\ncV,W/Z%uK6\f!%\\1zE\u0016$\u0015n]2p]:,7\r^!oIV\u0003H-\u0019;f\u0007>tGO]8mY\u0016\u0014H#A%\u0002\r\u0011|wk\u001c:l\u0003\u0015\u0019H/\u0019:u\u0003]\u0001(o\u001c;fGR,G\r\n8fi^|'o[\"mS\u0016tG\u000fF\u0002:\u0003oCq!\u0014\u0010\u0002\u0002\u0003\u0007Q\u0010")
public class NodeToControllerRequestThread
extends InterBrokerSendThread
implements Logging {
    private boolean isNetworkClientForZkController;
    private final Function1<ControllerInformation, KafkaClient> networkClientFactory;
    private final ManualMetadataUpdater metadataUpdater;
    private final ControllerNodeProvider controllerNodeProvider;
    private final Time time;
    private final long retryTimeoutMs;
    private final LinkedBlockingDeque<NodeToControllerQueueItem> requestQueue;
    private final AtomicReference<Node> activeController;
    private volatile boolean started;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public /* synthetic */ KafkaClient protected$networkClient(NodeToControllerRequestThread x$1) {
        return x$1.networkClient;
    }

    public boolean isNetworkClientForZkController() {
        return this.isNetworkClientForZkController;
    }

    public void isNetworkClientForZkController_$eq(boolean x$1) {
        this.isNetworkClientForZkController = x$1;
    }

    private void maybeResetNetworkClient(ControllerInformation controllerInformation) {
        if (this.isNetworkClientForZkController() != controllerInformation.isZkController()) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(88).append("Controller changed to ").append((Object)(this.isNetworkClientForZkController() ? "kraft" : "zk")).append(" mode. ").append("Resetting network client with new controller information : ").append(controllerInformation).toString());
            KafkaClient oldClient = this.networkClient;
            oldClient.initiateClose();
            oldClient.close();
            this.isNetworkClientForZkController_$eq(controllerInformation.isZkController());
            this.updateControllerAddress((Node)controllerInformation.node().orNull(Predef$.MODULE$.$conforms()));
            controllerInformation.node().foreach((Function1 & Serializable & scala.Serializable)n -> {
                this.metadataUpdater.setNodes((java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)n, (List)Nil$.MODULE$)).asJava());
                return BoxedUnit.UNIT;
            });
            this.networkClient = (KafkaClient)this.networkClientFactory.apply((Object)controllerInformation);
            return;
        }
    }

    private LinkedBlockingDeque<NodeToControllerQueueItem> requestQueue() {
        return this.requestQueue;
    }

    private AtomicReference<Node> activeController() {
        return this.activeController;
    }

    public boolean started() {
        return this.started;
    }

    public void started_$eq(boolean x$1) {
        this.started = x$1;
    }

    public Option<Node> activeControllerAddress() {
        return Option$.MODULE$.apply((Object)this.activeController().get());
    }

    private void updateControllerAddress(Node newActiveController) {
        this.activeController().set(newActiveController);
    }

    public void enqueue(NodeToControllerQueueItem request) {
        if (!this.started()) {
            throw new IllegalStateException("Cannot enqueue a request if the request thread is not running");
        }
        this.requestQueue().add(request);
        if (this.activeControllerAddress().isDefined()) {
            this.wakeup();
            return;
        }
    }

    public int queueSize() {
        return this.requestQueue().size();
    }

    public Collection<RequestAndCompletionHandler> generateRequests() {
        long currentTimeMs = this.time.milliseconds();
        Iterator<NodeToControllerQueueItem> requestIter = this.requestQueue().iterator();
        while (requestIter.hasNext()) {
            NodeToControllerQueueItem request = requestIter.next();
            if (currentTimeMs - request.createdTimeMs() >= this.retryTimeoutMs) {
                requestIter.remove();
                request.callback().onTimeout();
                continue;
            }
            Option<Node> controllerAddress = this.activeControllerAddress();
            if (!controllerAddress.isDefined()) continue;
            requestIter.remove();
            return Collections.singletonList(new RequestAndCompletionHandler(this.time.milliseconds(), (Node)controllerAddress.get(), request.request(), response -> this.handleResponse(request, response), request.context()));
        }
        return Collections.emptyList();
    }

    public void handleResponse(NodeToControllerQueueItem queueItem, ClientResponse response) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Request ").append(queueItem.request()).append(" received ").append(response).toString());
        if (response.authenticationException() != null) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(114).append("Request ").append(queueItem.request()).append(" failed due to authentication error with controller. Disconnecting the ").append("connection to the stale controller ").append(this.activeControllerAddress().map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.idString()).getOrElse((Function0 & Serializable & scala.Serializable)() -> "null")).toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> response.authenticationException());
            this.maybeDisconnectAndUpdateController();
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.versionMismatch() != null) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Request ").append(queueItem.request()).append(" failed due to unsupported version error").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> response.versionMismatch());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.wasDisconnected()) {
            this.updateControllerAddress(null);
            this.requestQueue().putFirst(queueItem);
            return;
        }
        if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(97).append("Request ").append(queueItem.request()).append(" received NOT_CONTROLLER exception. Disconnecting the ").append("connection to the stale controller ").append(this.activeControllerAddress().map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.idString()).getOrElse((Function0 & Serializable & scala.Serializable)() -> "null")).toString());
            this.maybeDisconnectAndUpdateController();
            this.requestQueue().putFirst(queueItem);
            return;
        }
        queueItem.callback().onComplete(response);
    }

    private void maybeDisconnectAndUpdateController() {
        this.activeControllerAddress().foreach((Function1 & Serializable & scala.Serializable)controllerAddress -> {
            NodeToControllerRequestThread.$anonfun$maybeDisconnectAndUpdateController$1(this, controllerAddress);
            return BoxedUnit.UNIT;
        });
    }

    public void doWork() {
        ControllerInformation controllerInformation = this.controllerNodeProvider.getControllerInfo();
        this.maybeResetNetworkClient(controllerInformation);
        if (this.activeControllerAddress().isDefined()) {
            super.pollOnce(Long.MAX_VALUE);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Controller isn't cached, looking for local metadata changes");
        Option<Node> option = controllerInformation.node();
        if (option instanceof Some) {
            Node controllerNode = (Node)((Some)option).value();
            String controllerType = controllerInformation.isZkController() ? "ZK" : "KRaft";
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Recorded new ").append(controllerType).append(" controller, from now on will use node ").append(controllerNode).toString());
            this.updateControllerAddress(controllerNode);
            this.metadataUpdater.setNodes((java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)controllerNode, (List)Nil$.MODULE$)).asJava());
            return;
        }
        if (None$.MODULE$.equals(option)) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "No controller provided, retrying after backoff");
            super.pollOnce(100L);
            return;
        }
        throw new MatchError(option);
    }

    public void start() {
        super.start();
        this.started_$eq(true);
    }

    public static final /* synthetic */ void $anonfun$maybeDisconnectAndUpdateController$1(NodeToControllerRequestThread $this, Node controllerAddress) {
        try {
            $this.protected$networkClient($this).disconnect(controllerAddress.idString());
        }
        catch (Throwable t) {
            $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Had an error while disconnecting from NetworkClient.", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> t);
        }
        $this.updateControllerAddress(null);
    }

    public NodeToControllerRequestThread(KafkaClient initialNetworkClient, boolean isNetworkClientForZkController, Function1<ControllerInformation, KafkaClient> networkClientFactory, ManualMetadataUpdater metadataUpdater, ControllerNodeProvider controllerNodeProvider, KafkaConfig config, Time time, String threadName, long retryTimeoutMs) {
        this.isNetworkClientForZkController = isNetworkClientForZkController;
        this.networkClientFactory = networkClientFactory;
        this.metadataUpdater = metadataUpdater;
        this.controllerNodeProvider = controllerNodeProvider;
        this.time = time;
        this.retryTimeoutMs = retryTimeoutMs;
        super(threadName, initialNetworkClient, (int)Math.min(Integer.MAX_VALUE, Math.min((long)config.controllerSocketTimeoutMs(), retryTimeoutMs)), time, false);
        Logging.$init$(this);
        this.logIdent_$eq(this.logPrefix);
        this.requestQueue = new LinkedBlockingDeque();
        this.activeController = new AtomicReference<Object>(null);
        this.started = false;
    }
}

