/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.apollo.broker;

import java.util.concurrent.TimeUnit;
import org.apache.activemq.apollo.broker.Delivery;
import org.apache.activemq.apollo.broker.Session;
import org.apache.activemq.apollo.broker.SessionLinkedNode;
import org.apache.activemq.apollo.broker.SessionSink;
import org.apache.activemq.apollo.broker.SessionSinkMux$;
import org.apache.activemq.apollo.broker.Sink;
import org.apache.activemq.apollo.broker.Sizer;
import org.apache.activemq.apollo.util.list.LinkedNodeList;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.package$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.HashSet;
import scala.collection.mutable.HashSet$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;

@ScalaSignature(bytes="\u0006\u0001\u0005ug\u0001B\u0001\u0003\u00015\u0011abU3tg&|gnU5oW6+\bP\u0003\u0002\u0004\t\u00051!M]8lKJT!!\u0002\u0004\u0002\r\u0005\u0004x\u000e\u001c7p\u0015\t9\u0001\"\u0001\u0005bGRLg/Z7r\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0004\u0001U\u0011a\"K\n\u0003\u0001=\u0001\"\u0001E\u000b\u000e\u0003EQ!AE\n\u0002\t1\fgn\u001a\u0006\u0002)\u0005!!.\u0019<b\u0013\t1\u0012C\u0001\u0004PE*,7\r\u001e\u0005\t1\u0001\u0011)\u0019!C\u00013\u0005QAm\\<ogR\u0014X-Y7\u0016\u0003i\u00012a\u0007\u000f\u001f\u001b\u0005\u0011\u0011BA\u000f\u0003\u0005\u0011\u0019\u0016N\\6\u0011\t}\u0011CeJ\u0007\u0002A)\t\u0011%A\u0003tG\u0006d\u0017-\u0003\u0002$A\t1A+\u001e9mKJ\u00022aG\u0013(\u0013\t1#AA\u0004TKN\u001c\u0018n\u001c8\u0011\u0005!JC\u0002\u0001\u0003\u0006U\u0001\u0011\ra\u000b\u0002\u0002)F\u0011Af\f\t\u0003?5J!A\f\u0011\u0003\u000f9{G\u000f[5oOB\u0011q\u0004M\u0005\u0003c\u0001\u00121!\u00118z\u0011!\u0019\u0004A!A!\u0002\u0013Q\u0012a\u00033po:\u001cHO]3b[\u0002B\u0001\"\u000e\u0001\u0003\u0006\u0004%\tAN\u0001\u000fG>t7/^7fe~\u000bX/Z;f+\u00059\u0004C\u0001\u001d>\u001b\u0005I$B\u0001\u001e<\u00031A\u0017m\u001e;eSN\u0004\u0018\r^2i\u0015\ta$\"\u0001\u0006gkN,7o\\;sG\u0016L!AP\u001d\u0003\u001b\u0011K7\u000f]1uG\"\fV/Z;f\u0011!\u0001\u0005A!A!\u0002\u00139\u0014aD2p]N,X.\u001a:`cV,W/\u001a\u0011\t\u0011\t\u0003!Q1A\u0005\u0002\r\u000bQa]5{KJ,\u0012\u0001\u0012\t\u00047\u0015;\u0013B\u0001$\u0003\u0005\u0015\u0019\u0016N_3s\u0011!A\u0005A!A!\u0002\u0013!\u0015AB:ju\u0016\u0014\b\u0005\u0003\u0005K\u0001\t\u0005\r\u0011\"\u0001L\u0003A!W\r\\5wKJLxl\u0019:fI&$8/F\u0001M!\tyR*\u0003\u0002OA\t\u0019\u0011J\u001c;\t\u0011A\u0003!\u00111A\u0005\u0002E\u000bA\u0003Z3mSZ,'/_0de\u0016$\u0017\u000e^:`I\u0015\fHC\u0001*V!\ty2+\u0003\u0002UA\t!QK\\5u\u0011\u001d1v*!AA\u00021\u000b1\u0001\u001f\u00132\u0011!A\u0006A!A!B\u0013a\u0015!\u00053fY&4XM]=`GJ,G-\u001b;tA!A!\f\u0001BA\u0002\u0013\u00051*\u0001\u0007tSj,wl\u0019:fI&$8\u000f\u0003\u0005]\u0001\t\u0005\r\u0011\"\u0001^\u0003A\u0019\u0018N_3`GJ,G-\u001b;t?\u0012*\u0017\u000f\u0006\u0002S=\"9akWA\u0001\u0002\u0004a\u0005\u0002\u00031\u0001\u0005\u0003\u0005\u000b\u0015\u0002'\u0002\u001bML'0Z0de\u0016$\u0017\u000e^:!\u0011\u0015\u0011\u0007\u0001\"\u0001d\u0003\u0019a\u0014N\\5u}Q1A-\u001a4hQ&\u00042a\u0007\u0001(\u0011\u0015A\u0012\r1\u0001\u001b\u0011\u0015)\u0014\r1\u00018\u0011\u0015\u0011\u0015\r1\u0001E\u0011\u0015Q\u0015\r1\u0001M\u0011\u0015Q\u0016\r1\u0001M\u0011\u001dY\u0007\u00011A\u0005\u00021\f\u0001b]3tg&|gn]\u000b\u0002[B\u0019an\u001d\u0013\u000e\u0003=T!\u0001]9\u0002\u000f5,H/\u00192mK*\u0011!\u000fI\u0001\u000bG>dG.Z2uS>t\u0017B\u0001;p\u0005\u001dA\u0015m\u001d5TKRDqA\u001e\u0001A\u0002\u0013\u0005q/\u0001\u0007tKN\u001c\u0018n\u001c8t?\u0012*\u0017\u000f\u0006\u0002Sq\"9a+^A\u0001\u0002\u0004i\u0007B\u0002>\u0001A\u0003&Q.A\u0005tKN\u001c\u0018n\u001c8tA!9A\u0010\u0001a\u0001\n\u0003i\u0018aE8wKJ4Gn\\<fI~\u001bXm]:j_:\u001cX#\u0001@\u0011\u000b}\fI!!\u0004\u000e\u0005\u0005\u0005!\u0002BA\u0002\u0003\u000b\tA\u0001\\5ti*\u0019\u0011q\u0001\u0003\u0002\tU$\u0018\u000e\\\u0005\u0005\u0003\u0017\t\tA\u0001\bMS:\\W\r\u001a(pI\u0016d\u0015n\u001d;\u0011\tm\tyaJ\u0005\u0004\u0003#\u0011!!E*fgNLwN\u001c'j].,GMT8eK\"I\u0011Q\u0003\u0001A\u0002\u0013\u0005\u0011qC\u0001\u0018_Z,'O\u001a7po\u0016$wl]3tg&|gn]0%KF$2AUA\r\u0011!1\u00161CA\u0001\u0002\u0004q\bbBA\u000f\u0001\u0001\u0006KA`\u0001\u0015_Z,'O\u001a7po\u0016$wl]3tg&|gn\u001d\u0011\t\u000f\u0005\u0005\u0002\u0001\"\u0011\u0002$\u0005AAo\\*ue&tw\r\u0006\u0002\u0002&A!\u0011qEA\u0017\u001d\ry\u0012\u0011F\u0005\u0004\u0003W\u0001\u0013A\u0002)sK\u0012,g-\u0003\u0003\u00020\u0005E\"AB*ue&twMC\u0002\u0002,\u0001Bq!!\u000e\u0001\t\u0003\t9$\u0001\u0003pa\u0016tG\u0003BA\u001d\u0003\u007f\u0001BaGA\u001eO%\u0019\u0011Q\b\u0002\u0003\u0017M+7o]5p]NKgn\u001b\u0005\b\u0003\u0003\n\u0019\u00041\u00018\u00039\u0001(o\u001c3vG\u0016\u0014x,];fk\u0016Dq!!\u0012\u0001\t\u0003\t9%A\u0003dY>\u001cX\rF\u0003S\u0003\u0013\ny\u0005\u0003\u0005\u0002L\u0005\r\u0003\u0019AA'\u0003\u001d\u0019Xm]:j_:\u00042a\u0007\u000f(\u0011!\t\t&a\u0011A\u0002\u0005M\u0013!\u0005:fU\u0016\u001cG/[8o?\"\fg\u000e\u001a7feB)q$!\u0016(%&\u0019\u0011q\u000b\u0011\u0003\u0013\u0019+hn\u0019;j_:\f\u0004bBA.\u0001\u0011\u0005\u0011QL\u0001\u0007e\u0016\u001c\u0018N_3\u0015\u000bI\u000by&a\u0019\t\u000f\u0005\u0005\u0014\u0011\fa\u0001\u0019\u0006!b.Z<`I\u0016d\u0017N^3ss~\u001b'/\u001a3jiNDq!!\u001a\u0002Z\u0001\u0007A*\u0001\toK^|6/\u001b>f?\u000e\u0014X\rZ5ug\"9\u0011\u0011\u000e\u0001\u0005\u0002\u0005-\u0014A\u0003;j[\u0016|6\u000f^1naV\u0011\u0011Q\u000e\t\u0004?\u0005=\u0014bAA9A\t!Aj\u001c8h\u0011%\t)\b\u0001a\u0001\n\u0003\t9(\u0001\u000bsK\n\fG.\u00198dK~\u001b8\r[3ek\u0006dW\rZ\u000b\u0003\u0003s\u00022aHA>\u0013\r\ti\b\t\u0002\b\u0005>|G.Z1o\u0011%\t\t\t\u0001a\u0001\n\u0003\t\u0019)\u0001\rsK\n\fG.\u00198dK~\u001b8\r[3ek\u0006dW\rZ0%KF$2AUAC\u0011%1\u0016qPA\u0001\u0002\u0004\tI\b\u0003\u0005\u0002\n\u0002\u0001\u000b\u0015BA=\u0003U\u0011XMY1mC:\u001cWmX:dQ\u0016$W/\u00197fI\u0002Bq!!$\u0001\t\u0003\ty)\u0001\ntG\",G-^1m?J,'-\u00197b]\u000e,W#\u0001*\t\u0013\u0005M\u0005\u00011A\u0005\u0002\u0005-\u0014!\u00057bgR|&/\u001a2bY\u0006t7-Z0ug\"I\u0011q\u0013\u0001A\u0002\u0013\u0005\u0011\u0011T\u0001\u0016Y\u0006\u001cHo\u0018:fE\u0006d\u0017M\\2f?R\u001cx\fJ3r)\r\u0011\u00161\u0014\u0005\n-\u0006U\u0015\u0011!a\u0001\u0003[B\u0001\"a(\u0001A\u0003&\u0011QN\u0001\u0013Y\u0006\u001cHo\u0018:fE\u0006d\u0017M\\2f?R\u001c\b\u0005C\u0004\u0002$\u0002!\t!a$\u0002\u001fI,'-\u00197b]\u000e,wl\u00195fG.Dq!a*\u0001\t\u0003\ty)A\u0005sK\n\fG.\u00198dK\"9\u00111\u0016\u0001\u0005\u0002\u0005=\u0015A\u00043sC&twl\u001c<fe\u001adwn\u001e\u0005\b\u0003_\u0003A\u0011AAY\u0003\u0011\u0001x\u000e\u001c7\u0016\u0003yAq!!.\u0001\t\u0003\t9,A\u0005eK2Lg/\u001a:fIR)!+!/\u0002D\"A\u00111JAZ\u0001\u0004\tY\f\u0005\u0003\u001cK\u0005u\u0006cA\u000e\u0002@&\u0019\u0011\u0011\u0019\u0002\u0003\u0011\u0011+G.\u001b<fefDq!!2\u00024\u0002\u0007A*\u0001\u0003tSj,w!CAe\u0005\u0005\u0005\t\u0012AAf\u00039\u0019Vm]:j_:\u001c\u0016N\\6Nkb\u00042aGAg\r!\t!!!A\t\u0002\u0005=7cAAg\u001f!9!-!4\u0005\u0002\u0005MGCAAf\u0011)\t9.!4D\u0002\u0013\u0015\u0011qO\u0001\u0013I\u0015t\u0017M\u00197f?\u0006\u001c8/\u001a:uS>t7\u000fC\u0005\u0002\\\u00065\u0007\u0015!\u0004\u0002z\u0005\u0019B%\u001a8bE2,w,Y:tKJ$\u0018n\u001c8tA\u0001")
public class SessionSinkMux<T> {
    private final Sink<Tuple2<Session<T>, T>> downstream;
    private final DispatchQueue consumer_queue;
    private final Sizer<T> sizer;
    private int delivery_credits;
    private int size_credits;
    private HashSet<Session<T>> sessions;
    private LinkedNodeList<SessionLinkedNode<T>> overflowed_sessions;
    private boolean rebalance_schedualed;
    private long last_rebalance_ts;

    public static boolean $enable_assertions() {
        return SessionSinkMux$.MODULE$.$enable_assertions();
    }

    public Sink<Tuple2<Session<T>, T>> downstream() {
        return this.downstream;
    }

    public DispatchQueue consumer_queue() {
        return this.consumer_queue;
    }

    public Sizer<T> sizer() {
        return this.sizer;
    }

    public int delivery_credits() {
        return this.delivery_credits;
    }

    public void delivery_credits_$eq(int x$1) {
        this.delivery_credits = x$1;
    }

    public int size_credits() {
        return this.size_credits;
    }

    public void size_credits_$eq(int x$1) {
        this.size_credits = x$1;
    }

    public HashSet<Session<T>> sessions() {
        return this.sessions;
    }

    public void sessions_$eq(HashSet<Session<T>> x$1) {
        this.sessions = x$1;
    }

    public LinkedNodeList<SessionLinkedNode<T>> overflowed_sessions() {
        return this.overflowed_sessions;
    }

    public void overflowed_sessions_$eq(LinkedNodeList<SessionLinkedNode<T>> x$1) {
        this.overflowed_sessions = x$1;
    }

    public String toString() {
        return new StringBuilder().append((Object)"SessionSinkMux(sessions: ").append((Object)BoxesRunTime.boxToInteger((int)this.sessions().size())).append((Object)", overflowed_sessions: ").append((Object)BoxesRunTime.boxToInteger((int)this.overflowed_sessions().size())).append((Object)", ").append(this.downstream()).append((Object)")").toString();
    }

    /*
     * WARNING - void declaration
     */
    public SessionSink<T> open(DispatchQueue producer_queue) {
        void var2_2;
        Session session = new Session(this, producer_queue);
        package$.MODULE$.DispatchQueueWrapper(this.consumer_queue()).$less$less$bar(package$.MODULE$.$up((Function0)new Serializable(this, session){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SessionSinkMux $outer;
            private final Session session$2;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                this.$outer.sessions().$plus$eq((Object)this.session$2);
                int bonus = this.$outer.size_credits() / this.$outer.sessions().size();
                this.session$2.size_bonus_$eq(bonus);
                this.session$2.credit(this.$outer.delivery_credits(), 1 + bonus);
                this.$outer.schedual_rebalance();
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.session$2 = session$2;
            }
        }));
        return var2_2;
    }

    public void close(Sink<T> session, Function1<T, BoxedUnit> rejection_handler) {
        package$.MODULE$.DispatchQueueWrapper(this.consumer_queue()).$less$less$bar(package$.MODULE$.$up((Function0)new Serializable(this, session, rejection_handler){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SessionSinkMux $outer;
            private final Sink session$1;
            private final Function1 rejection_handler$1;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                Sink sink = this.session$1;
                if (sink instanceof Session) {
                    Session x2 = (Session)sink;
                    this.$outer.sessions().$minus$eq((Object)x2);
                    x2.close(this.rejection_handler$1);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                throw new MatchError((Object)sink);
            }
            {
                void var3_3;
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.session$1 = session$1;
                this.rejection_handler$1 = var3_3;
            }
        }));
    }

    public void resize(int new_delivery_credits, int new_size_credits) {
        package$.MODULE$.DispatchQueueWrapper(this.consumer_queue()).apply((Function0)new Serializable(this, new_delivery_credits, new_size_credits){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SessionSinkMux $outer;
            private final int new_delivery_credits$1;
            private final int new_size_credits$1;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                int delivery_credits_change = this.new_delivery_credits$1 - this.$outer.delivery_credits();
                if (delivery_credits_change != 0) {
                    this.$outer.sessions().foreach((Function1)new Serializable(this, delivery_credits_change){
                        public static final long serialVersionUID = 0L;
                        private final int delivery_credits_change$1;

                        public final void apply(Session<T> session) {
                            session.credit(this.delivery_credits_change$1, 0);
                        }
                        {
                            this.delivery_credits_change$1 = delivery_credits_change$1;
                        }
                    });
                }
                this.$outer.delivery_credits_$eq(this.new_delivery_credits$1);
                this.$outer.size_credits_$eq(this.new_size_credits$1);
                this.$outer.schedual_rebalance();
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.new_delivery_credits$1 = new_delivery_credits$1;
                this.new_size_credits$1 = new_size_credits$1;
            }
        });
    }

    public long time_stamp() {
        return 0L;
    }

    public boolean rebalance_schedualed() {
        return this.rebalance_schedualed;
    }

    public void rebalance_schedualed_$eq(boolean x$1) {
        this.rebalance_schedualed = x$1;
    }

    public void schedual_rebalance() {
        if (!this.rebalance_schedualed()) {
            this.rebalance_schedualed_$eq(true);
            package$.MODULE$.DispatchQueueWrapper(this.consumer_queue()).after(550L, TimeUnit.MILLISECONDS, (Function0)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ SessionSinkMux $outer;

                public final void apply() {
                    this.apply$mcV$sp();
                }

                public void apply$mcV$sp() {
                    this.$outer.rebalance_schedualed_$eq(false);
                    this.$outer.rebalance_check();
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                }
            });
        }
    }

    public long last_rebalance_ts() {
        return this.last_rebalance_ts;
    }

    public void last_rebalance_ts_$eq(long x$1) {
        this.last_rebalance_ts = x$1;
    }

    public void rebalance_check() {
        long now = this.time_stamp();
        if (now - this.last_rebalance_ts() > 500L && this.sessions().size() > 0) {
            this.last_rebalance_ts_$eq(now);
            this.rebalance();
        }
    }

    public void rebalance() {
        ObjectRef stalled_sessions = new ObjectRef((Object)Nil$.MODULE$);
        LongRef total_stalls = new LongRef(0L);
        this.sessions().foreach((Function1)new Serializable(this, stalled_sessions, total_stalls){
            public static final long serialVersionUID = 0L;
            private final ObjectRef stalled_sessions$1;
            private final LongRef total_stalls$1;

            public final void apply(Session<T> session) {
                if (session.stall_counter() > 0) {
                    this.total_stalls$1.elem += (long)session.stall_counter();
                    this.stalled_sessions$1.elem = ((List)this.stalled_sessions$1.elem).$colon$colon(session);
                }
            }
            {
                void var3_3;
                this.stalled_sessions$1 = stalled_sessions$1;
                this.total_stalls$1 = var3_3;
            }
        });
        ((List)stalled_sessions.elem).foreach((Function1)new Serializable(this, total_stalls){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SessionSinkMux $outer;
            private final LongRef total_stalls$1;

            public final void apply(Session<T> session) {
                float slice_percent = (float)session.stall_counter() / (float)this.total_stalls$1.elem;
                int new_size_bonus = (int)((float)this.$outer.size_credits() * slice_percent);
                int change = new_size_bonus - session.size_bonus();
                session.stall_counter_$eq(0);
                session.size_bonus_$eq(session.size_bonus() + change);
                session.credit(0, change);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.total_stalls$1 = total_stalls$1;
            }
        });
    }

    public void drain_overflow() {
        while (!this.overflowed_sessions().isEmpty()) {
            if (this.downstream().full()) {
                return;
            }
            boolean accepted = this.downstream().offer(this.poll());
            if (!SessionSinkMux$.MODULE$.$enable_assertions()) continue;
            Predef$.MODULE$.assert(accepted, (Function0)new Serializable(this, accepted){
                public static final long serialVersionUID = 0L;
                private final boolean accepted$1;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("accepted with accepted=>%s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToBoolean((boolean)this.accepted$1)}));
                }
                {
                    this.accepted$1 = accepted$1;
                }
            });
        }
        return;
    }

    public Tuple2<Session<T>, T> poll() {
        Tuple2 tuple2;
        this.consumer_queue().assertExecuting();
        if (this.overflowed_sessions().isEmpty()) {
            tuple2 = null;
        } else {
            Session session = ((SessionLinkedNode)this.overflowed_sessions().getHead()).session();
            Object value = session.overflow().removeFirst();
            if (session.stall_counter() > 0) {
                this.schedual_rebalance();
            }
            Object object = session.overflow().isEmpty() ? BoxesRunTime.boxToBoolean((boolean)session.overflow_node().unlink()) : this.overflowed_sessions().rotate();
            tuple2 = new Tuple2(session, value);
        }
        return tuple2;
    }

    public void delivered(Session<Delivery> session, int size) {
        session.credit(1, size);
    }

    public SessionSinkMux(Sink<Tuple2<Session<T>, T>> downstream, DispatchQueue consumer_queue, Sizer<T> sizer, int delivery_credits, int size_credits) {
        this.downstream = downstream;
        this.consumer_queue = consumer_queue;
        this.sizer = sizer;
        this.delivery_credits = delivery_credits;
        this.size_credits = size_credits;
        this.sessions = (HashSet)HashSet$.MODULE$.apply((Seq)Nil$.MODULE$);
        this.overflowed_sessions = new LinkedNodeList();
        this.rebalance_schedualed = false;
        this.last_rebalance_ts = this.time_stamp();
        downstream.refiller_$eq(package$.MODULE$.$up((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SessionSinkMux $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                this.$outer.drain_overflow();
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        }));
    }
}

