/*
 * Decompiled with CFR 0.152.
 */
package kafka.common;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import kafka.common.RequestAndCompletionHandler;
import kafka.common.UnsentRequests;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.collection.Iterable;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0005\u00055d!B\r\u001b\u0003\u0003y\u0002\"\u0003\u0014\u0001\u0005\u0003\u0005\u000b\u0011B\u00145\u0011!)\u0004A!A!\u0002\u00131\u0004\u0002C!\u0001\u0005\u0003\u0005\u000b\u0011\u0002\"\t\u0011\u0019\u0003!\u0011!Q\u0001\n\u001dC\u0011\"\u0014\u0001\u0003\u0002\u0003\u0006IAT)\t\u000bI\u0003A\u0011A*\t\u000fm\u0003!\u0019!C\u00019\"1\u0001\r\u0001Q\u0001\nuCQ!\u0019\u0001\u0007\u0002\tDQa\u001c\u0001\u0005\u0002ADQ!\u001d\u0001\u0005BIDQA\u001e\u0001\u0005\nIDQa\u001e\u0001\u0005\u0012aDQA \u0001\u0005BIDaa \u0001\u0005\n\u0005\u0005\u0001bBA\u0005\u0001\u0011%\u00111\u0002\u0005\b\u0003\u001f\u0001A\u0011BA\t\u0011\u001d\t)\u0002\u0001C\u0001\u0003/Aq!!\u000e\u0001\t\u0003\t9\u0004\u0003\u0004\u0002F\u0001!\tA]\u0004\n\u0003\u000fR\u0012\u0011!E\u0001\u0003\u00132\u0001\"\u0007\u000e\u0002\u0002#\u0005\u00111\n\u0005\u0007%Z!\t!a\u0015\t\u0013\u0005Uc#%A\u0005\u0002\u0005]#!F%oi\u0016\u0014(I]8lKJ\u001cVM\u001c3UQJ,\u0017\r\u001a\u0006\u00037q\taaY8n[>t'\"A\u000f\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\t\t\u0003C\u0011j\u0011A\t\u0006\u0003Gq\tQ!\u001e;jYNL!!\n\u0012\u0003%MCW\u000f\u001e3po:\f'\r\\3UQJ,\u0017\rZ\u0001\u0005]\u0006lW\r\u0005\u0002)c9\u0011\u0011f\f\t\u0003U5j\u0011a\u000b\u0006\u0003Yy\ta\u0001\u0010:p_Rt$\"\u0001\u0018\u0002\u000bM\u001c\u0017\r\\1\n\u0005Aj\u0013A\u0002)sK\u0012,g-\u0003\u00023g\t11\u000b\u001e:j]\u001eT!\u0001M\u0017\n\u0005\u0019\"\u0013!\u00048fi^|'o[\"mS\u0016tG\u000f\u0005\u00028\u007f5\t\u0001H\u0003\u0002:u\u000591\r\\5f]R\u001c(BA\u000f<\u0015\taT(\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002}\u0005\u0019qN]4\n\u0005\u0001C$aC&bM.\f7\t\\5f]R\f\u0001C]3rk\u0016\u001cH\u000fV5nK>,H/T:\u0011\u0005\r#U\"A\u0017\n\u0005\u0015k#aA%oi\u0006!A/[7f!\tA5*D\u0001J\u0015\t\u0019#J\u0003\u0002\u001cu%\u0011A*\u0013\u0002\u0005)&lW-A\bjg&sG/\u001a:skB$\u0018N\u00197f!\t\u0019u*\u0003\u0002Q[\t9!i\\8mK\u0006t\u0017BA'%\u0003\u0019a\u0014N\\5u}Q1AKV,Y3j\u0003\"!\u0016\u0001\u000e\u0003iAQA\n\u0004A\u0002\u001dBQ!\u000e\u0004A\u0002YBQ!\u0011\u0004A\u0002\tCQA\u0012\u0004A\u0002\u001dCq!\u0014\u0004\u0011\u0002\u0003\u0007a*\u0001\bv]N,g\u000e\u001e*fcV,7\u000f^:\u0016\u0003u\u0003\"!\u00160\n\u0005}S\"AD+og\u0016tGOU3rk\u0016\u001cHo]\u0001\u0010k:\u001cXM\u001c;SKF,Xm\u001d;tA\u0005\u0001r-\u001a8fe\u0006$XMU3rk\u0016\u001cHo\u001d\u000b\u0002GB\u0019A-\u001b7\u000f\u0005\u0015<gB\u0001\u0016g\u0013\u0005q\u0013B\u00015.\u0003\u001d\u0001\u0018mY6bO\u0016L!A[6\u0003\u0011%#XM]1cY\u0016T!\u0001[\u0017\u0011\u0005Uk\u0017B\u00018\u001b\u0005m\u0011V-];fgR\fe\u000eZ\"p[BdW\r^5p]\"\u000bg\u000e\u001a7fe\u0006\t\u0002.Y:V]N,g\u000e\u001e*fcV,7\u000f^:\u0016\u00039\u000b\u0001b\u001d5vi\u0012|wO\u001c\u000b\u0002gB\u00111\t^\u0005\u0003k6\u0012A!\u00168ji\u00061BM]1j]\u001e+g.\u001a:bi\u0016$'+Z9vKN$8/\u0001\u0005q_2dwJ\\2f)\t\u0019\u0018\u0010C\u0003{\u001b\u0001\u000710\u0001\u0007nCb$\u0016.\\3pkRl5\u000f\u0005\u0002Dy&\u0011Q0\f\u0002\u0005\u0019>tw-\u0001\u0004e_^{'o[\u0001\rg\u0016tGMU3rk\u0016\u001cHo\u001d\u000b\u0006w\u0006\r\u0011q\u0001\u0005\u0007\u0003\u000by\u0001\u0019A>\u0002\u00079|w\u000fC\u0003{\u001f\u0001\u000710\u0001\tdQ\u0016\u001c7\u000eR5tG>tg.Z2ugR\u00191/!\u0004\t\r\u0005\u0015\u0001\u00031\u0001|\u0003M1\u0017-\u001b7FqBL'/\u001a3SKF,Xm\u001d;t)\r\u0019\u00181\u0003\u0005\u0007\u0003\u000b\t\u0002\u0019A>\u0002-\r|W\u000e\u001d7fi\u0016<\u0016\u000e\u001e5ESN\u001cwN\u001c8fGR$ra]A\r\u0003G\t)\u0003C\u0004\u0002\u001cI\u0001\r!!\b\u0002\u000fI,\u0017/^3tiB\u0019q'a\b\n\u0007\u0005\u0005\u0002HA\u0007DY&,g\u000e\u001e*fcV,7\u000f\u001e\u0005\u0007\u0003\u000b\u0011\u0002\u0019A>\t\u000f\u0005\u001d\"\u00031\u0001\u0002*\u00059\u0012-\u001e;iK:$\u0018nY1uS>tW\t_2faRLwN\u001c\t\u0005\u0003W\t\t$\u0004\u0002\u0002.)\u0019\u0011q\u0006&\u0002\r\u0015\u0014(o\u001c:t\u0013\u0011\t\u0019$!\f\u0003/\u0005+H\u000f[3oi&\u001c\u0017\r^5p]\u0016C8-\u001a9uS>t\u0017a\u00055bg&sg\t\\5hQR\u0014V-];fgR\u001cHc\u0001(\u0002:!9\u00111H\nA\u0002\u0005u\u0012\u0001\u00028pI\u0016\u0004B!a\u0010\u0002B5\t!*C\u0002\u0002D)\u0013AAT8eK\u00061q/Y6fkB\fQ#\u00138uKJ\u0014%o\\6feN+g\u000e\u001a+ie\u0016\fG\r\u0005\u0002V-M\u0019a#!\u0014\u0011\u0007\r\u000by%C\u0002\u0002R5\u0012a!\u00118z%\u00164GCAA%\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%kU\u0011\u0011\u0011\f\u0016\u0004\u001d\u0006m3FAA/!\u0011\ty&!\u001b\u000e\u0005\u0005\u0005$\u0002BA2\u0003K\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005\u001dT&\u0001\u0006b]:|G/\u0019;j_:LA!a\u001b\u0002b\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3")
public abstract class InterBrokerSendThread
extends ShutdownableThread {
    private final KafkaClient networkClient;
    private final int requestTimeoutMs;
    private final Time time;
    private final UnsentRequests unsentRequests;

    public static boolean $lessinit$greater$default$5() {
        return true;
    }

    public UnsentRequests unsentRequests() {
        return this.unsentRequests;
    }

    public abstract Iterable<RequestAndCompletionHandler> generateRequests();

    public boolean hasUnsentRequests() {
        return this.unsentRequests().iterator().hasNext();
    }

    @Override
    public void shutdown() {
        this.initiateShutdown();
        this.networkClient.initiateClose();
        this.awaitShutdown();
        this.networkClient.close();
    }

    private void drainGeneratedRequests() {
        this.generateRequests().foreach((Function1 & Serializable)request -> {
            InterBrokerSendThread.$anonfun$drainGeneratedRequests$1(this, request);
            return BoxedUnit.UNIT;
        });
    }

    public void pollOnce(long maxTimeoutMs) {
        try {
            this.drainGeneratedRequests();
            long now = this.time.milliseconds();
            long timeout = this.sendRequests(now, maxTimeoutMs);
            this.networkClient.poll(timeout, now);
            now = this.time.milliseconds();
            this.checkDisconnects(now);
            this.failExpiredRequests(now);
            this.unsentRequests().clean();
            return;
        }
        catch (Throwable throwable) {
            if (throwable instanceof DisconnectException && !this.networkClient.active()) {
                return;
            }
            if (throwable instanceof FatalExitError) {
                throw (FatalExitError)throwable;
            }
            if (throwable != null) {
                this.error((Function0<String>)(Function0 & Serializable)() -> "unhandled exception caught in InterBrokerSendThread", (Function0<Throwable>)(Function0 & Serializable)() -> throwable);
                throw new FatalExitError();
            }
            throw null;
        }
    }

    @Override
    public void doWork() {
        this.pollOnce(Long.MAX_VALUE);
    }

    private long sendRequests(long now, long maxTimeoutMs) {
        LongRef pollTimeout = LongRef.create((long)maxTimeoutMs);
        CollectionConverters$.MODULE$.SetHasAsScala(this.unsentRequests().nodes()).asScala().foreach((Function1 & Serializable)node -> {
            InterBrokerSendThread.$anonfun$sendRequests$1(this, now, pollTimeout, node);
            return BoxedUnit.UNIT;
        });
        return pollTimeout.elem;
    }

    /*
     * WARNING - void declaration
     */
    private void checkDisconnects(long now) {
        Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator = this.unsentRequests().iterator();
        while (iterator.hasNext()) {
            void var5_4;
            void var6_5;
            Map.Entry<Node, ArrayDeque<ClientRequest>> entry = iterator.next();
            ArrayDeque<ClientRequest> arrayDeque = entry.getValue();
            Node node = entry.getKey();
            ArrayDeque<ClientRequest> requests = arrayDeque;
            if (var6_5.isEmpty() || !this.networkClient.connectionFailed((Node)var5_4)) continue;
            iterator.remove();
            CollectionConverters$.MODULE$.CollectionHasAsScala((Collection)var6_5).asScala().foreach(arg_0 -> InterBrokerSendThread.$anonfun$checkDisconnects$1$adapted(this, (Node)var5_4, now, arg_0));
        }
    }

    private void failExpiredRequests(long now) {
        Collection<ClientRequest> timedOutRequests = this.unsentRequests().removeAllTimedOut(now);
        CollectionConverters$.MODULE$.CollectionHasAsScala(timedOutRequests).asScala().foreach((Function1 & Serializable)request -> {
            InterBrokerSendThread.$anonfun$failExpiredRequests$1(this, now, request);
            return BoxedUnit.UNIT;
        });
    }

    public void completeWithDisconnect(ClientRequest request, long now, AuthenticationException authenticationException) {
        RequestCompletionHandler handler = request.callback();
        handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()), handler, request.destination(), now, now, true, null, authenticationException, null));
    }

    public boolean hasInFlightRequests(Node node) {
        return this.unsentRequests().hasUnsentRequests(node) || this.networkClient.hasInFlightRequests(node.idString());
    }

    public void wakeup() {
        this.networkClient.wakeup();
    }

    public static final /* synthetic */ void $anonfun$drainGeneratedRequests$1(InterBrokerSendThread $this, RequestAndCompletionHandler request) {
        $this.unsentRequests().put(request.destination(), $this.networkClient.newClientRequest(request.destination().idString(), request.request(), request.creationTimeMs(), true, $this.requestTimeoutMs, request.handler()));
    }

    public static final /* synthetic */ void $anonfun$sendRequests$1(InterBrokerSendThread $this, long now$1, LongRef pollTimeout$1, Node node) {
        Iterator<ClientRequest> requestIterator = $this.unsentRequests().requestIterator(node);
        while (requestIterator.hasNext()) {
            ClientRequest request = requestIterator.next();
            if ($this.networkClient.ready(node, now$1)) {
                $this.networkClient.send(request, now$1);
                requestIterator.remove();
                continue;
            }
            pollTimeout$1.elem = Math.min(pollTimeout$1.elem, $this.networkClient.connectionDelay(node, now$1));
        }
    }

    public static final /* synthetic */ void $anonfun$checkDisconnects$1(InterBrokerSendThread $this, Node node$1, long now$2, ClientRequest request) {
        AuthenticationException authenticationException = $this.networkClient.authenticationException(node$1);
        if (authenticationException != null) {
            $this.error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(66).append("Failed to send the following request due to authentication error: ").append(request).toString());
        }
        $this.completeWithDisconnect(request, now$2, authenticationException);
    }

    public static final /* synthetic */ void $anonfun$failExpiredRequests$1(InterBrokerSendThread $this, long now$3, ClientRequest request) {
        $this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(48).append("Failed to send the following request after ").append(request.requestTimeoutMs()).append(" ms: ").append(request).toString());
        $this.completeWithDisconnect(request, now$3, null);
    }

    public InterBrokerSendThread(String name, KafkaClient networkClient, int requestTimeoutMs, Time time, boolean isInterruptible) {
        this.networkClient = networkClient;
        this.requestTimeoutMs = requestTimeoutMs;
        this.time = time;
        super(name, isInterruptible);
        this.unsentRequests = new UnsentRequests();
    }

    public static final /* synthetic */ Object $anonfun$checkDisconnects$1$adapted(InterBrokerSendThread $this, Node node$1, long now$2, ClientRequest request) {
        InterBrokerSendThread.$anonfun$checkDisconnects$1($this, node$1, now$2, request);
        return BoxedUnit.UNIT;
    }
}

