/*
 * Decompiled with CFR 0.152.
 */
package kafka.common;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import kafka.common.InterBrokerSendThread$;
import kafka.common.RequestAndCompletionHandler;
import kafka.common.UnsentRequests;
import kafka.utils.Logging;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.ShutdownableThread;
import scala.Function0;
import scala.Function1;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0001\u0005Ee!B\u000e\u001d\u0003\u0003\t\u0003\u0002C\u001b\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001c\t\u0011\r\u0003!\u00111A\u0005\u0002\u0011C\u0001b\u0013\u0001\u0003\u0002\u0004%\t\u0001\u0014\u0005\t'\u0002\u0011\t\u0011)Q\u0005\u000b\"A\u0001\f\u0001B\u0001B\u0003%\u0011\f\u0003\u0005]\u0001\t\u0005\t\u0015!\u0003^\u0011!\u0019\u0007A!A!\u0002\u0013!\u0007\"B4\u0001\t\u0003A\u0007b\u00029\u0001\u0005\u0004%\t!\u001d\u0005\u0007k\u0002\u0001\u000b\u0011\u0002:\t\u000bY\u0004a\u0011A<\t\u000f\u0005%\u0001\u0001\"\u0001\u0002\f!9\u0011Q\u0002\u0001\u0005B\u0005=\u0001bBA\t\u0001\u0011%\u0011q\u0002\u0005\b\u0003'\u0001A\u0011CA\u000b\u0011\u001d\t\t\u0003\u0001C!\u0003\u001fAq!a\t\u0001\t\u0013\t)\u0003C\u0004\u0002.\u0001!I!a\f\t\u000f\u0005M\u0002\u0001\"\u0003\u00026!9\u0011\u0011\b\u0001\u0005\u0002\u0005m\u0002bBA-\u0001\u0011\u0005\u00111\f\u0005\b\u0003S\u0002A\u0011AA\b\u000f%\tY\u0007HA\u0001\u0012\u0003\tiG\u0002\u0005\u001c9\u0005\u0005\t\u0012AA8\u0011\u00199\u0007\u0004\"\u0001\u0002x!I\u0011\u0011\u0010\r\u0012\u0002\u0013\u0005\u00111\u0010\u0002\u0016\u0013:$XM\u001d\"s_.,'oU3oIRC'/Z1e\u0015\tib$\u0001\u0004d_6lwN\u001c\u0006\u0002?\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001#_A\u00111%L\u0007\u0002I)\u0011QEJ\u0001\u0005kRLGN\u0003\u0002(Q\u000511/\u001a:wKJT!aH\u0015\u000b\u0005)Z\u0013AB1qC\u000eDWMC\u0001-\u0003\ry'oZ\u0005\u0003]\u0011\u0012!c\u00155vi\u0012|wO\\1cY\u0016$\u0006N]3bIB\u0011\u0001gM\u0007\u0002c)\u0011!GH\u0001\u0006kRLGn]\u0005\u0003iE\u0012q\u0001T8hO&tw-\u0001\u0003oC6,\u0007CA\u001cA\u001d\tAd\b\u0005\u0002:y5\t!H\u0003\u0002<A\u00051AH]8pizR\u0011!P\u0001\u0006g\u000e\fG.Y\u0005\u0003\u007fq\na\u0001\u0015:fI\u00164\u0017BA!C\u0005\u0019\u0019FO]5oO*\u0011q\bP\u0001\u000e]\u0016$xo\u001c:l\u00072LWM\u001c;\u0016\u0003\u0015\u0003\"AR%\u000e\u0003\u001dS!\u0001\u0013\u0015\u0002\u000f\rd\u0017.\u001a8ug&\u0011!j\u0012\u0002\f\u0017\u000647.Y\"mS\u0016tG/A\toKR<xN]6DY&,g\u000e^0%KF$\"!T)\u0011\u00059{U\"\u0001\u001f\n\u0005Ac$\u0001B+oSRDqAU\u0002\u0002\u0002\u0003\u0007Q)A\u0002yIE\naB\\3uo>\u00148n\u00117jK:$\b\u0005\u000b\u0002\u0005+B\u0011aJV\u0005\u0003/r\u0012\u0001B^8mCRLG.Z\u0001\u0011e\u0016\fX/Z:u)&lWm\\;u\u001bN\u0004\"A\u0014.\n\u0005mc$aA%oi\u0006!A/[7f!\tq\u0016-D\u0001`\u0015\t\u0011\u0004M\u0003\u0002\u001eQ%\u0011!m\u0018\u0002\u0005)&lW-A\bjg&sG/\u001a:skB$\u0018N\u00197f!\tqU-\u0003\u0002gy\t9!i\\8mK\u0006t\u0017A\u0002\u001fj]&$h\b\u0006\u0004jW2lgn\u001c\t\u0003U\u0002i\u0011\u0001\b\u0005\u0006k!\u0001\rA\u000e\u0005\u0006\u0007\"\u0001\r!\u0012\u0005\u00061\"\u0001\r!\u0017\u0005\u00069\"\u0001\r!\u0018\u0005\bG\"\u0001\n\u00111\u0001e\u00039)hn]3oiJ+\u0017/^3tiN,\u0012A\u001d\t\u0003UNL!\u0001\u001e\u000f\u0003\u001dUs7/\u001a8u%\u0016\fX/Z:ug\u0006yQO\\:f]R\u0014V-];fgR\u001c\b%\u0001\thK:,'/\u0019;f%\u0016\fX/Z:ugR\t\u0001\u0010\u0005\u0003z}\u0006\raB\u0001>}\u001d\tI40C\u0001>\u0013\tiH(A\u0004qC\u000e\\\u0017mZ3\n\u0007}\f\tA\u0001\u0005Ji\u0016\u0014\u0018M\u00197f\u0015\tiH\bE\u0002k\u0003\u000bI1!a\u0002\u001d\u0005m\u0011V-];fgR\fe\u000eZ\"p[BdW\r^5p]\"\u000bg\u000e\u001a7fe\u0006\t\u0002.Y:V]N,g\u000e\u001e*fcV,7\u000f^:\u0016\u0003\u0011\f\u0001b\u001d5vi\u0012|wO\u001c\u000b\u0002\u001b\u00061BM]1j]\u001e+g.\u001a:bi\u0016$'+Z9vKN$8/\u0001\u0005q_2dwJ\\2f)\ri\u0015q\u0003\u0005\b\u00033y\u0001\u0019AA\u000e\u00031i\u0017\r\u001f+j[\u0016|W\u000f^'t!\rq\u0015QD\u0005\u0004\u0003?a$\u0001\u0002'p]\u001e\fa\u0001Z8X_J\\\u0017\u0001D:f]\u0012\u0014V-];fgR\u001cHCBA\u000e\u0003O\tY\u0003C\u0004\u0002*E\u0001\r!a\u0007\u0002\u00079|w\u000fC\u0004\u0002\u001aE\u0001\r!a\u0007\u0002!\rDWmY6ESN\u001cwN\u001c8fGR\u001cHcA'\u00022!9\u0011\u0011\u0006\nA\u0002\u0005m\u0011a\u00054bS2,\u0005\u0010]5sK\u0012\u0014V-];fgR\u001cHcA'\u00028!9\u0011\u0011F\nA\u0002\u0005m\u0011AF2p[BdW\r^3XSRDG)[:d_:tWm\u0019;\u0015\u000f5\u000bi$a\u0012\u0002J!9\u0011q\b\u000bA\u0002\u0005\u0005\u0013a\u0002:fcV,7\u000f\u001e\t\u0004\r\u0006\r\u0013bAA#\u000f\ni1\t\\5f]R\u0014V-];fgRDq!!\u000b\u0015\u0001\u0004\tY\u0002C\u0004\u0002LQ\u0001\r!!\u0014\u0002/\u0005,H\u000f[3oi&\u001c\u0017\r^5p]\u0016C8-\u001a9uS>t\u0007\u0003BA(\u0003+j!!!\u0015\u000b\u0007\u0005M\u0003-\u0001\u0004feJ|'o]\u0005\u0005\u0003/\n\tFA\fBkRDWM\u001c;jG\u0006$\u0018n\u001c8Fq\u000e,\u0007\u000f^5p]\u0006\u0019\u0002.Y:J]\u001ac\u0017n\u001a5u%\u0016\fX/Z:ugR\u0019A-!\u0018\t\u000f\u0005}S\u00031\u0001\u0002b\u0005!an\u001c3f!\u0011\t\u0019'!\u001a\u000e\u0003\u0001L1!a\u001aa\u0005\u0011qu\u000eZ3\u0002\r]\f7.Z;q\u0003UIe\u000e^3s\u0005J|7.\u001a:TK:$G\u000b\u001b:fC\u0012\u0004\"A\u001b\r\u0014\u0007a\t\t\bE\u0002O\u0003gJ1!!\u001e=\u0005\u0019\te.\u001f*fMR\u0011\u0011QN\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u001b\u0016\u0005\u0005u$f\u00013\u0002\u0000-\u0012\u0011\u0011\u0011\t\u0005\u0003\u0007\u000bi)\u0004\u0002\u0002\u0006*!\u0011qQAE\u0003%)hn\u00195fG.,GMC\u0002\u0002\fr\n!\"\u00198o_R\fG/[8o\u0013\u0011\ty)!\"\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\r")
public abstract class InterBrokerSendThread
extends ShutdownableThread
implements Logging {
    private volatile KafkaClient networkClient;
    private final int requestTimeoutMs;
    private final Time time;
    private final UnsentRequests unsentRequests;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    public static boolean $lessinit$greater$default$5() {
        return InterBrokerSendThread$.MODULE$.$lessinit$greater$default$5();
    }

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public KafkaClient networkClient() {
        return this.networkClient;
    }

    public void networkClient_$eq(KafkaClient x$1) {
        this.networkClient = x$1;
    }

    public UnsentRequests unsentRequests() {
        return this.unsentRequests;
    }

    public abstract Iterable<RequestAndCompletionHandler> generateRequests();

    public boolean hasUnsentRequests() {
        return this.unsentRequests().iterator().hasNext();
    }

    public void shutdown() {
        this.initiateShutdown();
        this.networkClient().initiateClose();
        this.awaitShutdown();
        this.networkClient().close();
    }

    private void drainGeneratedRequests() {
        this.generateRequests().foreach((Function1 & Serializable & scala.Serializable)request -> {
            InterBrokerSendThread.$anonfun$drainGeneratedRequests$1(this, request);
            return BoxedUnit.UNIT;
        });
    }

    public void pollOnce(long maxTimeoutMs) {
        try {
            this.drainGeneratedRequests();
            long now = this.time.milliseconds();
            long timeout = this.sendRequests(now, maxTimeoutMs);
            this.networkClient().poll(timeout, now);
            now = this.time.milliseconds();
            this.checkDisconnects(now);
            this.failExpiredRequests(now);
            this.unsentRequests().clean();
            return;
        }
        catch (Throwable throwable) {
            if (throwable instanceof DisconnectException && !this.networkClient().active()) {
                return;
            }
            if (throwable instanceof FatalExitError) {
                throw (FatalExitError)throwable;
            }
            if (throwable != null) {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "unhandled exception caught in InterBrokerSendThread", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> throwable);
                throw new FatalExitError();
            }
            throw null;
        }
    }

    public void doWork() {
        this.pollOnce(Long.MAX_VALUE);
    }

    private long sendRequests(long now, long maxTimeoutMs) {
        LongRef pollTimeout = LongRef.create((long)maxTimeoutMs);
        ((IterableLike)CollectionConverters$.MODULE$.asScalaSetConverter(this.unsentRequests().nodes()).asScala()).foreach((Function1 & Serializable & scala.Serializable)node -> {
            InterBrokerSendThread.$anonfun$sendRequests$1(this, now, pollTimeout, node);
            return BoxedUnit.UNIT;
        });
        return pollTimeout.elem;
    }

    /*
     * WARNING - void declaration
     */
    private void checkDisconnects(long now) {
        Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator = this.unsentRequests().iterator();
        while (iterator.hasNext()) {
            void var6_5;
            void var5_4;
            Map.Entry<Node, ArrayDeque<ClientRequest>> entry = iterator.next();
            ArrayDeque<ClientRequest> arrayDeque = entry.getValue();
            Node node = entry.getKey();
            ArrayDeque<ClientRequest> requests = arrayDeque;
            void node2 = var5_4;
            void requests2 = var6_5;
            if (requests2.isEmpty() || !this.networkClient().connectionFailed((Node)node2)) continue;
            iterator.remove();
            ((IterableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)requests2).asScala()).foreach(arg_0 -> InterBrokerSendThread.$anonfun$checkDisconnects$1$adapted(this, (Node)node2, now, arg_0));
        }
    }

    private void failExpiredRequests(long now) {
        Collection<ClientRequest> timedOutRequests = this.unsentRequests().removeAllTimedOut(now);
        ((IterableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(timedOutRequests).asScala()).foreach((Function1 & Serializable & scala.Serializable)request -> {
            InterBrokerSendThread.$anonfun$failExpiredRequests$1(this, now, request);
            return BoxedUnit.UNIT;
        });
    }

    public void completeWithDisconnect(ClientRequest request, long now, AuthenticationException authenticationException) {
        RequestCompletionHandler handler = request.callback();
        handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()), handler, request.destination(), now, now, true, null, authenticationException, null));
    }

    public boolean hasInFlightRequests(Node node) {
        return this.unsentRequests().hasUnsentRequests(node) || this.networkClient().hasInFlightRequests(node.idString());
    }

    public void wakeup() {
        this.networkClient().wakeup();
    }

    public static final /* synthetic */ void $anonfun$drainGeneratedRequests$1(InterBrokerSendThread $this, RequestAndCompletionHandler request) {
        $this.unsentRequests().put(request.destination(), $this.networkClient().newClientRequest(request.destination().idString(), request.request(), request.creationTimeMs(), true, $this.requestTimeoutMs, request.handler()));
    }

    public static final /* synthetic */ void $anonfun$sendRequests$1(InterBrokerSendThread $this, long now$1, LongRef pollTimeout$1, Node node) {
        Iterator<ClientRequest> requestIterator = $this.unsentRequests().requestIterator(node);
        while (requestIterator.hasNext()) {
            ClientRequest request = requestIterator.next();
            if ($this.networkClient().ready(node, now$1)) {
                $this.networkClient().send(request, now$1);
                requestIterator.remove();
                continue;
            }
            pollTimeout$1.elem = Math.min(pollTimeout$1.elem, $this.networkClient().connectionDelay(node, now$1));
        }
    }

    public static final /* synthetic */ void $anonfun$checkDisconnects$1(InterBrokerSendThread $this, Node node$1, long now$2, ClientRequest request) {
        AuthenticationException authenticationException = $this.networkClient().authenticationException(node$1);
        if (authenticationException != null) {
            $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(66).append("Failed to send the following request due to authentication error: ").append(request).toString());
        }
        $this.completeWithDisconnect(request, now$2, authenticationException);
    }

    public static final /* synthetic */ void $anonfun$failExpiredRequests$1(InterBrokerSendThread $this, long now$3, ClientRequest request) {
        $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Failed to send the following request after ").append(request.requestTimeoutMs()).append(" ms: ").append(request).toString());
        $this.completeWithDisconnect(request, now$3, null);
    }

    public InterBrokerSendThread(String name, KafkaClient networkClient, int requestTimeoutMs, Time time, boolean isInterruptible) {
        this.networkClient = networkClient;
        this.requestTimeoutMs = requestTimeoutMs;
        this.time = time;
        super(name, isInterruptible);
        Logging.$init$(this);
        this.logIdent_$eq(this.logPrefix);
        this.unsentRequests = new UnsentRequests();
    }

    public static final /* synthetic */ Object $anonfun$checkDisconnects$1$adapted(InterBrokerSendThread $this, Node node$1, long now$2, ClientRequest request) {
        InterBrokerSendThread.$anonfun$checkDisconnects$1($this, node$1, now$2, request);
        return BoxedUnit.UNIT;
    }
}

