/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.apollo.mqtt;

import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.apollo.broker.AbstractRetainedDeliveryConsumer;
import org.apache.activemq.apollo.broker.AbstractSessionSinkFilter;
import org.apache.activemq.apollo.broker.BindAddress;
import org.apache.activemq.apollo.broker.BindableDeliveryProducer;
import org.apache.activemq.apollo.broker.BrokerConnection;
import org.apache.activemq.apollo.broker.ConnectAddress;
import org.apache.activemq.apollo.broker.Consumed$;
import org.apache.activemq.apollo.broker.CreditWindowFilter;
import org.apache.activemq.apollo.broker.Delivered$;
import org.apache.activemq.apollo.broker.Delivery;
import org.apache.activemq.apollo.broker.Delivery$;
import org.apache.activemq.apollo.broker.DeliveryConsumer;
import org.apache.activemq.apollo.broker.DeliveryProducer;
import org.apache.activemq.apollo.broker.DeliveryProducerRoute;
import org.apache.activemq.apollo.broker.DeliveryResult;
import org.apache.activemq.apollo.broker.DeliverySession;
import org.apache.activemq.apollo.broker.DestinationAddress;
import org.apache.activemq.apollo.broker.DestinationParser;
import org.apache.activemq.apollo.broker.Message;
import org.apache.activemq.apollo.broker.MutableSink;
import org.apache.activemq.apollo.broker.RetainAction;
import org.apache.activemq.apollo.broker.RetainRemove$;
import org.apache.activemq.apollo.broker.RetainSet$;
import org.apache.activemq.apollo.broker.Session;
import org.apache.activemq.apollo.broker.SessionSink;
import org.apache.activemq.apollo.broker.SessionSinkMux;
import org.apache.activemq.apollo.broker.SimpleAddress;
import org.apache.activemq.apollo.broker.Sink;
import org.apache.activemq.apollo.broker.Sizer;
import org.apache.activemq.apollo.broker.SubscriptionAddress;
import org.apache.activemq.apollo.broker.Undelivered$;
import org.apache.activemq.apollo.broker.VirtualHost;
import org.apache.activemq.apollo.broker.protocol.RawMessage;
import org.apache.activemq.apollo.broker.protocol.RawMessageCodec$;
import org.apache.activemq.apollo.broker.security.SecurityContext;
import org.apache.activemq.apollo.broker.store.StoreUOW;
import org.apache.activemq.apollo.filter.FilterException;
import org.apache.activemq.apollo.mqtt.MqttProtocol;
import org.apache.activemq.apollo.mqtt.MqttProtocolHandler;
import org.apache.activemq.apollo.mqtt.MqttSessionManager;
import org.apache.activemq.apollo.mqtt.Request;
import org.apache.activemq.apollo.mqtt.SessionDeliverySizer;
import org.apache.activemq.apollo.util.Fn1;
import org.apache.activemq.apollo.util.Fn2;
import org.apache.activemq.apollo.util.LRUCache;
import org.apache.activemq.apollo.util.LongCounter;
import org.apache.activemq.apollo.util.Scala2Java;
import org.apache.activemq.apollo.util.UnitFn1;
import org.apache.activemq.apollo.util.UnitFn2;
import org.apache.activemq.apollo.util.path.Path;
import org.apache.activemq.apollo.util.path.Path$;
import org.apache.activemq.apollo.util.path.PathMap;
import org.apache.activemq.apollo.util.path.PathParser;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.CustomDispatchSource;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.EventAggregator;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.MessageSupport;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBACK;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import scala.Option;
import scala.Tuple2;

public class MqttSession {
    public final MqttSessionManager.HostState host_state;
    public final UTF8Buffer client_id;
    public final MqttSessionManager.SessionState session_state;
    public final DispatchQueue queue;
    public boolean manager_disconnected = false;
    public MqttProtocolHandler handler;
    public SecurityContext security_context;
    public boolean clean_session = false;
    public CONNECT connect_message;
    public DestinationParser destination_parser = MqttProtocol.destination_parser;
    boolean publish_body = false;
    final HashMap<Short, Request> in_flight_publishes = new HashMap();
    LRUCache<UTF8Buffer, MqttProducerRoute> producerRoutes = new LRUCache<UTF8Buffer, MqttProducerRoute>(10){

        protected void onCacheEviction(final Map.Entry<UTF8Buffer, MqttProducerRoute> eldest) {
            MqttSession.this.host().dispatch_queue().execute(new Task(){

                public void run() {
                    ConnectAddress[] array = new ConnectAddress[]{((MqttProducerRoute)((Object)eldest.getValue())).address};
                    MqttSession.this.host().router().disconnect(array, (BindableDeliveryProducer)eldest.getValue());
                }
            });
        }
    };
    MqttConsumer _mqtt_consumer;

    public MqttSession(MqttSessionManager.HostState host_state, UTF8Buffer client_id, MqttSessionManager.SessionState session_state) {
        this.host_state = host_state;
        this.client_id = client_id;
        this.queue = Dispatch.createQueue((String)("mqtt: " + client_id));
        this.session_state = session_state;
    }

    public VirtualHost host() {
        return this.host_state.host;
    }

    public void connect(final MqttProtocolHandler next) {
        this.queue.execute(new Task(){

            public void run() {
                if (MqttSession.this.manager_disconnected) {
                    MqttSessionManager.attach(MqttSession.this.host(), MqttSession.this.client_id, next);
                } else {
                    MqttSession.this.queue.suspend();
                    if (MqttSession.this.handler != null) {
                        MqttSession.this.detach();
                        MqttSession.this.handler = null;
                    }
                    MqttSession.this.queue.execute(new Task(){

                        public void run() {
                            MqttSession.this.handler = next;
                            MqttSession.this.attach();
                        }
                    });
                    next.connection()._set_dispatch_queue(MqttSession.this.queue, new Task(){

                        public void run() {
                            MqttSession.this.queue.resume();
                        }
                    });
                }
            }
        });
    }

    public void disconnect(final MqttProtocolHandler prev) {
        this.queue.execute(new Task(){

            public void run() {
                if (MqttSession.this.handler == prev) {
                    MqttSessionManager.remove(MqttSession.this.host_state, MqttSession.this.client_id);
                    MqttSession.this.manager_disconnected = true;
                    MqttSession.this.detach();
                    MqttSession.this.handler = null;
                }
            }
        });
    }

    public void attach() {
        this.queue.assertExecuting();
        final MqttProtocolHandler h = this.handler;
        this.clean_session = h.connect_message.cleanSession();
        this.security_context = h.security_context;
        h.command_handler = new UnitFn1<Object>(){

            public void call(Object v1) {
                MqttSession.this.on_transport_command(v1);
            }
        };
        this.destination_parser = h.destination_parser();
        this.mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.some((Object)h.sink_manager.open()));
        final Task ack_connect = new Task(){

            public void run() {
                MqttSession.this.queue.assertExecuting();
                MqttSession.this.connect_message = h.connect_message;
                CONNACK connack = new CONNACK();
                connack.code(CONNACK.Code.CONNECTION_ACCEPTED);
                MqttSession.this.send((MessageSupport.Message)connack);
            }
        };
        if (!this.clean_session) {
            this.session_state.strategy.create(this.host().store(), this.client_id);
            if (!this.session_state.subscriptions.isEmpty()) {
                h._suspend_read("subscribing");
                ArrayList topics = Scala2Java.map(this.session_state.subscriptions.values(), (Fn1)new Fn1<Tuple2<Topic, BindAddress>, Topic>(){

                    public Topic apply(Tuple2<Topic, BindAddress> v1) {
                        return (Topic)v1._1();
                    }
                });
                this.subscribe(topics, new Task(){

                    public void run() {
                        h.resume_read();
                        h.queue().execute(ack_connect);
                    }
                });
            } else {
                ack_connect.run();
            }
        } else {
            this.session_state.subscriptions.clear();
            if (this.session_state.durable_sub != null) {
                final DestinationAddress[] addresses = new DestinationAddress[]{this.session_state.durable_sub};
                this.session_state.durable_sub = null;
                this.host().dispatch_queue().execute(new Task(){

                    public void run() {
                        MqttSession.this.host().router().delete(addresses, MqttSession.this.security_context);
                    }
                });
            }
            this.session_state.strategy.destroy(new Task(){

                public void run() {
                    ack_connect.run();
                }
            });
        }
    }

    public void detach() {
        BindAddress[] addresses;
        this.queue.assertExecuting();
        if (!this.producerRoutes.isEmpty()) {
            final ArrayList routes = new ArrayList(this.producerRoutes.values());
            this.host().dispatch_queue().execute(new Task(){

                public void run() {
                    for (MqttProducerRoute route : routes) {
                        MqttSession.this.host().router().disconnect(new ConnectAddress[]{route.address}, (BindableDeliveryProducer)route);
                    }
                }
            });
            this.producerRoutes.clear();
        }
        if (this.clean_session) {
            if (!this.mqtt_consumer().addresses.isEmpty()) {
                addresses = this.mqtt_consumer().addresses.keySet().toArray(new BindAddress[this.mqtt_consumer().addresses.size()]);
                this.host().dispatch_queue().execute(new Runnable(){

                    @Override
                    public void run() {
                        MqttSession.this.host().router().unbind(addresses, (DeliveryConsumer)MqttSession.this.mqtt_consumer(), false, MqttSession.this.security_context);
                    }
                });
                this.mqtt_consumer().addresses.clear();
            }
            this.session_state.subscriptions.clear();
        } else if (this.session_state.durable_sub != null) {
            addresses = new BindAddress[]{this.session_state.durable_sub};
            this.host().dispatch_queue().execute(new Runnable(){

                @Override
                public void run() {
                    MqttSession.this.host().router().unbind(addresses, (DeliveryConsumer)MqttSession.this.mqtt_consumer(), false, MqttSession.this.security_context);
                }
            });
            this.mqtt_consumer().addresses.clear();
            this.session_state.durable_sub = null;
        }
        for (Request request : this.in_flight_publishes.values()) {
            if (request.ack == null) continue;
            request.ack.apply(request.delivered ? Delivered$.MODULE$ : Undelivered$.MODULE$);
        }
        this.in_flight_publishes.clear();
        this.handler.sink_manager.close((Sink)this.mqtt_consumer().consumer_sink.downstream().get(), Scala2Java.noopFn1());
        this.mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.none());
        this.handler.on_transport_disconnected();
    }

    public SimpleAddress decode_destination(UTF8Buffer value) {
        SimpleAddress rc = this.destination_parser.decode_single_destination(value.toString(), Scala2Java.toScala((Fn1)new Fn1<String, SimpleAddress>(){

            public SimpleAddress apply(String name) {
                return new SimpleAddress("topic", MqttSession.this.destination_parser.decode_path(name));
            }
        }));
        if (rc == null && this.handler != null) {
            this.handler.die("Invalid mqtt destination name: " + value);
        }
        return rc;
    }

    public void send(MessageSupport.Message message) {
        this.queue.assertExecuting();
        this.handler.connection_sink.offer((Object)new Request(0, message, null));
    }

    public void publish_completed(short id) {
        this.queue.assertExecuting();
        Request request = this.in_flight_publishes.remove(id);
        if (request != null) {
            if (request.ack != null) {
                request.ack.apply((Object)Consumed$.MODULE$);
            }
        } else {
            this.in_flight_publishes.put(id, new Request(id, null, null));
        }
    }

    public void on_transport_command(Object o) {
        try {
            if (o.getClass() == MQTTFrame.class) {
                MQTTFrame command = (MQTTFrame)o;
                switch (command.messageType()) {
                    case 3: {
                        this.on_mqtt_publish(MqttProtocolHandler.received(new PUBLISH().decode(command)));
                        break;
                    }
                    case 6: {
                        final PUBREL ack = MqttProtocolHandler.received(new PUBREL().decode(command));
                        this.session_state.received_message_ids.remove(ack.messageId());
                        this.session_state.strategy.update(new Task(){

                            public void run() {
                                MqttSession.this.send((MessageSupport.Message)new PUBCOMP().messageId(ack.messageId()));
                            }
                        });
                        break;
                    }
                    case 8: {
                        this.on_mqtt_subscribe(MqttProtocolHandler.received(new SUBSCRIBE().decode(command)));
                        break;
                    }
                    case 10: {
                        this.on_mqtt_unsubscribe(MqttProtocolHandler.received(new UNSUBSCRIBE().decode(command)));
                        break;
                    }
                    case 4: {
                        PUBACK ack = MqttProtocolHandler.received(new PUBACK().decode(command));
                        this.publish_completed(ack.messageId());
                        break;
                    }
                    case 5: {
                        PUBREC ack = MqttProtocolHandler.received(new PUBREC().decode(command));
                        this.send((MessageSupport.Message)new PUBREL().messageId(ack.messageId()));
                        break;
                    }
                    case 7: {
                        PUBCOMP ack = MqttProtocolHandler.received(new PUBCOMP().decode(command));
                        this.publish_completed(ack.messageId());
                        break;
                    }
                    case 12: {
                        MqttProtocolHandler.received(new PINGREQ().decode(command));
                        this.send((MessageSupport.Message)new PINGRESP());
                        break;
                    }
                    case 14: {
                        MqttProtocolHandler.received(new DISCONNECT());
                        MqttSessionManager.disconnect(this.host_state, this.client_id, this.handler);
                        break;
                    }
                    default: {
                        this.handler.die("Invalid MQTT message type: " + command.messageType());
                        break;
                    }
                }
            } else if ("failure".equals(o)) {
                this.publish_will(new Task(){

                    public void run() {
                        MqttSessionManager.disconnect(MqttSession.this.host_state, MqttSession.this.client_id, MqttSession.this.handler);
                    }
                });
            } else {
                this.handler.die("Internal Server Error: unexpected mqtt command: " + o.getClass());
            }
        }
        catch (ProtocolException e) {
            this.handler.die("Internal Server Error: " + e);
        }
    }

    public void on_mqtt_publish(final PUBLISH publish) {
        if (publish.qos() == QoS.EXACTLY_ONCE && this.session_state.received_message_ids.contains(publish.messageId())) {
            PUBREC response = new PUBREC();
            response.messageId(publish.messageId());
            this.send((MessageSupport.Message)response);
            return;
        }
        this.handler.messages_received.incrementAndGet();
        this.queue.assertExecuting();
        MqttProducerRoute route = (MqttProducerRoute)((Object)this.producerRoutes.get((Object)publish.topicName()));
        if (route == null) {
            final SimpleAddress destination = this.decode_destination(publish.topicName());
            final MqttProducerRoute froute = route = new MqttProducerRoute(destination, this.handler);
            route.handler._suspend_read("route publish lookup");
            this.host().dispatch_queue().execute(new Task(){

                public void run() {
                    MqttSession.this.host().router().connect(new ConnectAddress[]{destination}, (BindableDeliveryProducer)froute, MqttSession.this.security_context);
                    MqttSession.this.queue.execute(new Runnable(){

                        @Override
                        public void run() {
                            if (!froute.handler.connection().stopped()) {
                                froute.handler.resume_read();
                                MqttSession.this.producerRoutes.put((Object)publish.topicName(), (Object)froute);
                                MqttSession.this.send_via_route(froute, publish);
                            }
                        }
                    });
                }
            });
        } else {
            this.send_via_route(route, publish);
        }
    }

    public void send_via_route(MqttProducerRoute route, PUBLISH publish) {
        this.queue.assertExecuting();
        AtLeastOnceProducerAck ack = null;
        if (publish.qos() == QoS.AT_LEAST_ONCE) {
            ack = new AtLeastOnceProducerAck(publish);
        } else if (publish.qos() == QoS.EXACTLY_ONCE) {
            ack = new ExactlyOnceProducerAck(publish);
        }
        if (!route.targets().isEmpty()) {
            Delivery delivery = new Delivery();
            delivery.message_$eq((Message)new RawMessage(publish.payload()));
            delivery.persistent_$eq(publish.qos().ordinal() > 0);
            delivery.size_$eq(publish.payload().length);
            delivery.ack_$eq(Scala2Java.toScala((Fn2)ack));
            if (publish.retain()) {
                if (delivery.size() == 0) {
                    delivery.retain_$eq((RetainAction)RetainRemove$.MODULE$);
                } else {
                    delivery.retain_$eq((RetainAction)RetainSet$.MODULE$);
                }
            }
            assert (!route.full());
            route.offer(delivery);
            if (route.full()) {
                route.suspended = true;
                this.handler._suspend_read("blocked sending to: " + route.address);
            }
        } else if (ack != null) {
            ack.apply(null, null);
        }
    }

    public void publish_will(final Task complete_close) {
        if (this.connect_message != null) {
            if (this.connect_message.willTopic() == null) {
                complete_close.run();
            } else {
                final SimpleAddress destination = this.decode_destination(this.connect_message.willTopic());
                final DeliveryProducerRoute prodcuer = new DeliveryProducerRoute(this.host().router()){
                    {
                        this.refiller_$eq(Dispatch.NOOP);
                    }

                    public int send_buffer_size() {
                        return 65536;
                    }

                    public Option<BrokerConnection> connection() {
                        return MqttSession.this.handler != null ? Scala2Java.some((Object)MqttSession.this.handler.connection()) : Scala2Java.none();
                    }

                    public DispatchQueue dispatch_queue() {
                        return MqttSession.this.queue;
                    }
                };
                this.host().dispatch_queue().execute(new Task(){

                    public void run() {
                        MqttSession.this.host().router().connect(new ConnectAddress[]{destination}, (BindableDeliveryProducer)prodcuer, MqttSession.this.security_context);
                        MqttSession.this.queue.execute(new Task(){

                            public void run() {
                                if (prodcuer.targets().isEmpty()) {
                                    complete_close.run();
                                } else {
                                    Delivery delivery = new Delivery();
                                    delivery.message_$eq((Message)new RawMessage((Buffer)MqttSession.this.connect_message.willMessage()));
                                    delivery.size_$eq(MqttSession.this.connect_message.willMessage().length);
                                    delivery.persistent_$eq(MqttSession.this.connect_message.willQos().ordinal() > 0);
                                    if (MqttSession.this.connect_message.willRetain()) {
                                        if (delivery.size() == 0) {
                                            delivery.retain_$eq((RetainAction)RetainRemove$.MODULE$);
                                        } else {
                                            delivery.retain_$eq((RetainAction)RetainSet$.MODULE$);
                                        }
                                    }
                                    delivery.ack_$eq(Scala2Java.toScala((Fn2)new UnitFn2<DeliveryResult, StoreUOW>(){

                                        public void call(DeliveryResult x, StoreUOW y) {
                                            MqttSession.this.host().dispatch_queue().execute(new Task(){

                                                public void run() {
                                                    MqttSession.this.host().router().disconnect(new ConnectAddress[]{destination}, (BindableDeliveryProducer)prodcuer);
                                                }
                                            });
                                            complete_close.run();
                                        }
                                    }));
                                    MqttSession.this.handler.messages_received.incrementAndGet();
                                    prodcuer.offer(delivery);
                                }
                            }
                        });
                    }
                });
            }
        }
    }

    public void on_mqtt_subscribe(final SUBSCRIBE sub) {
        this.subscribe(Arrays.asList(sub.topics()), new Task(){

            public void run() {
                MqttSession.this.queue.execute(new Task(){

                    public void run() {
                        MqttSession.this.session_state.strategy.update(new Task(){

                            public void run() {
                                SUBACK suback = new SUBACK();
                                suback.messageId(sub.messageId());
                                byte[] granted = new byte[sub.topics().length];
                                int i = 0;
                                for (Topic topic : sub.topics()) {
                                    granted[i] = (byte)topic.qos().ordinal();
                                    ++i;
                                }
                                suback.grantedQos(granted);
                                MqttSession.this.send((MessageSupport.Message)suback);
                            }
                        });
                    }
                });
            }
        });
    }

    public void subscribe(Collection<Topic> topics, final Task on_subscribed) {
        final ArrayList addresses = Scala2Java.map(topics, (Fn1)new Fn1<Topic, BindAddress>(){

            public BindAddress apply(Topic topic) {
                SimpleAddress address = MqttSession.this.decode_destination(topic.name());
                MqttSession.this.session_state.subscriptions.put(topic.name(), (Tuple2<Topic, BindAddress>)new Tuple2((Object)topic, (Object)address));
                MqttSession.this.mqtt_consumer().addresses.put((BindAddress)address, topic.qos());
                if (PathParser.containsWildCards((Path)address.path())) {
                    MqttSession.this.mqtt_consumer().wildcards.put(address.path(), (Object)topic.qos());
                }
                return address;
            }
        });
        this.handler.subscription_count = this.mqtt_consumer().addresses.size();
        if (!this.clean_session) {
            SubscriptionAddress durable_sub;
            Set<BindAddress> bindAddressSet = this.mqtt_consumer().addresses.keySet();
            this.session_state.durable_sub = durable_sub = new SubscriptionAddress(Path$.MODULE$.create(this.client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()]));
            addresses.clear();
            addresses.add(durable_sub);
        }
        this.host().dispatch_queue().execute(new Task(){

            public void run() {
                for (BindAddress address : addresses) {
                    MqttSession.this.host().router().bind(new BindAddress[]{address}, (DeliveryConsumer)MqttSession.this.mqtt_consumer(), MqttSession.this.security_context, Scala2Java.noopFn1());
                }
                on_subscribed.run();
            }
        });
    }

    public void on_mqtt_unsubscribe(final UNSUBSCRIBE unsubscribe) {
        ArrayList addressesList = Scala2Java.flatMap(Arrays.asList(unsubscribe.topics()), (Fn1)new Fn1<UTF8Buffer, Option<BindAddress>>(){

            public Option<BindAddress> apply(UTF8Buffer topicName) {
                Tuple2<Topic, BindAddress> removed = MqttSession.this.session_state.subscriptions.remove(topicName);
                if (removed != null) {
                    Topic topic = (Topic)removed._1();
                    BindAddress address = (BindAddress)removed._2();
                    MqttSession.this.mqtt_consumer().addresses.remove(address);
                    if (PathParser.containsWildCards((Path)address.path())) {
                        MqttSession.this.mqtt_consumer().wildcards.remove(address.path(), (Object)topic.qos());
                    }
                    return Scala2Java.some((Object)address);
                }
                return Scala2Java.none();
            }
        });
        final BindAddress[] addresses = addressesList.toArray(new BindAddress[addressesList.size()]);
        this.handler.subscription_count = this.mqtt_consumer().addresses.size();
        if (!this.clean_session) {
            Set<BindAddress> bindAddressSet = this.mqtt_consumer().addresses.keySet();
            this.session_state.durable_sub = new SubscriptionAddress(Path$.MODULE$.create(this.client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()]));
        }
        this.host().dispatch_queue().execute(new Task(){

            public void run() {
                if (MqttSession.this.clean_session) {
                    MqttSession.this.host().router().unbind(addresses, (DeliveryConsumer)MqttSession.this.mqtt_consumer(), false, MqttSession.this.security_context);
                } else if (MqttSession.this.mqtt_consumer().addresses.isEmpty()) {
                    MqttSession.this.host().router().unbind(new BindAddress[]{MqttSession.this.session_state.durable_sub}, (DeliveryConsumer)MqttSession.this.mqtt_consumer(), true, MqttSession.this.security_context);
                    MqttSession.this.session_state.durable_sub = null;
                } else {
                    MqttSession.this.host().router().bind(new BindAddress[]{MqttSession.this.session_state.durable_sub}, (DeliveryConsumer)MqttSession.this.mqtt_consumer(), MqttSession.this.security_context, Scala2Java.noopFn1());
                }
                MqttSession.this.queue.execute(new Task(){

                    public void run() {
                        MqttSession.this.session_state.strategy.update(new Task(){

                            public void run() {
                                UNSUBACK ack = new UNSUBACK();
                                ack.messageId(unsubscribe.messageId());
                                MqttSession.this.send((MessageSupport.Message)ack);
                            }
                        });
                    }
                });
            }
        });
    }

    MqttConsumer mqtt_consumer() {
        if (this._mqtt_consumer == null) {
            this._mqtt_consumer = new MqttConsumer();
        }
        return this._mqtt_consumer;
    }

    class MqttConsumer
    extends AbstractRetainedDeliveryConsumer {
        public HashMap<BindAddress, QoS> addresses = new HashMap();
        public PathMap wildcards = new PathMap();
        CustomDispatchSource<IntPair, IntPair> credit_window_source;
        MutableSink<Request> consumer_sink;
        public LongCounter next_seq_id;
        CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter;
        SessionSinkMux<Delivery> session_manager;

        MqttConsumer() {
            this.credit_window_source = Dispatch.createSource((EventAggregator)new EventAggregator<IntPair, IntPair>(){

                public IntPair mergeEvent(IntPair previous, IntPair event) {
                    if (previous == null) {
                        return event;
                    }
                    return new IntPair(previous._1 + event._1, previous._2 + event._2);
                }

                public IntPair mergeEvents(IntPair previous, IntPair events) {
                    return this.mergeEvent(previous, events);
                }
            }, (DispatchQueue)MqttSession.this.queue);
            this.credit_window_source.setEventHandler(new Task(){

                public void run() {
                    IntPair data = (IntPair)MqttConsumer.this.credit_window_source.getData();
                    MqttConsumer.this.credit_window_filter.credit(data._1, data._2);
                }
            });
            this.credit_window_source.resume();
            this.consumer_sink = new MutableSink();
            this.consumer_sink.downstream_$eq(Scala2Java.none());
            this.next_seq_id = new LongCounter(0L);
            this.credit_window_filter = new CreditWindowFilter(this.consumer_sink.flatMap(Scala2Java.toScala((Fn1)new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>(){

                public Option<Request> apply(Tuple2<Session<Delivery>, Delivery> event) {
                    MqttSession.this.queue.assertExecuting();
                    Session session = (Session)event._1();
                    final Delivery delivery = (Delivery)event._2();
                    MqttConsumer.this.session_manager.delivered(session, delivery.size());
                    SimpleAddress topic = ((DestinationAddress)delivery.sender().head()).simple();
                    QoS qos = MqttConsumer.this.addresses.get(topic);
                    if (qos == null) {
                        qos = (QoS)Scala2Java.head((Iterable)MqttConsumer.this.wildcards.get(topic.path()));
                    }
                    if (qos == null) {
                        MqttConsumer.this.acked(delivery, (DeliveryResult)Consumed$.MODULE$);
                        return Scala2Java.none();
                    }
                    PUBLISH publish = new PUBLISH();
                    publish.topicName(new UTF8Buffer(MqttSession.this.destination_parser.encode_destination((DestinationAddress)delivery.sender().head())));
                    if (delivery.redeliveries() > 0) {
                        publish.dup(true);
                    }
                    if (delivery.message().codec() == RawMessageCodec$.MODULE$) {
                        publish.payload(((RawMessage)delivery.message()).payload());
                    } else if (MqttSession.this.publish_body) {
                        try {
                            publish.payload((Buffer)delivery.message().getBodyAs(Buffer.class));
                        }
                        catch (FilterException e) {
                            MqttProtocolHandler.log.error((Throwable)e, "Internal Server Error: Could not covert message body to a Buffer", new Object[0]);
                        }
                    } else {
                        publish.payload(delivery.message().encoded());
                    }
                    MqttSession.this.handler.messages_sent.incrementAndGet();
                    UnitFn1<DeliveryResult> ack = new UnitFn1<DeliveryResult>(){

                        public void call(DeliveryResult result) {
                            MqttConsumer.this.acked(delivery, result);
                        }
                    };
                    if (delivery.ack() != null && qos != QoS.AT_MOST_ONCE) {
                        publish.qos(qos);
                        short id = MqttConsumer.this.to_message_id(MqttSession.this.clean_session ? MqttConsumer.this.get_next_seq_id() : delivery.seq());
                        publish.messageId(id);
                        Request request = new Request(id, (MessageSupport.Message)publish, ack);
                        Request prev = MqttSession.this.in_flight_publishes.put(id, request);
                        if (prev != null) {
                            if (prev.message == null) {
                                MqttSession.this.in_flight_publishes.remove(id);
                                MqttConsumer.this.acked(delivery, (DeliveryResult)Consumed$.MODULE$);
                            } else {
                                MqttSession.this.handler.async_die("Client not acking regularly.", null);
                            }
                        }
                        return Scala2Java.some((Object)request);
                    }
                    publish.qos(QoS.AT_MOST_ONCE);
                    return Scala2Java.some((Object)new Request(0, (MessageSupport.Message)publish, ack));
                }
            })), (Sizer)SessionDeliverySizer.INSTANCE);
            this.credit_window_filter.credit(MqttSession.this.handler.codec.getWriteBufferSize() * 2, 1);
            this.session_manager = new SessionSinkMux<Delivery>(this.credit_window_filter, MqttSession.this.queue, (Sizer)Delivery$.MODULE$, 0x3FFFFFFF, this.receive_buffer_size()){

                public long time_stamp() {
                    return MqttSession.this.host().broker().now();
                }
            };
        }

        public String toString() {
            return "mqtt client:" + MqttSession.this.client_id + " remote address: " + MqttSession.this.security_context.remote_address();
        }

        public DispatchQueue dispatch_queue() {
            return MqttSession.this.queue;
        }

        public long get_next_seq_id() {
            return this.next_seq_id.getAndIncrement();
        }

        short to_message_id(long value) {
            return (short)(0x8000L | value & 0x7FFFL);
        }

        public void acked(Delivery delivery, DeliveryResult result) {
            MqttSession.this.queue.assertExecuting();
            this.credit_window_source.merge((Object)new IntPair(delivery.size(), 1));
            if (delivery.ack() != null) {
                delivery.ack().apply((Object)result, null);
            }
        }

        private void super_dispose() {
            super.dispose();
        }

        protected void dispose() {
            MqttSession.this.queue.execute(new Task(){

                public void run() {
                    MqttConsumer.this.super_dispose();
                }
            });
        }

        public Option<BrokerConnection> connection() {
            return MqttSession.this.handler != null ? Scala2Java.some((Object)MqttSession.this.handler.connection()) : Scala2Java.none();
        }

        public int receive_buffer_size() {
            return 65536;
        }

        public boolean is_persistent() {
            return false;
        }

        public boolean matches(Delivery message) {
            return true;
        }

        public MqttConsumerSession connect(DeliveryProducer p) {
            return new MqttConsumerSession(p);
        }

        class MqttConsumerSession
        extends AbstractSessionSinkFilter<Delivery>
        implements DeliverySession {
            final DeliveryProducer producer;
            final SessionSink<Delivery> downstream;
            public boolean closed = false;

            MqttConsumerSession(DeliveryProducer producer) {
                producer.dispatch_queue().assertExecuting();
                this.producer = producer;
                this.downstream = MqttConsumer.this.session_manager.open(producer.dispatch_queue());
                MqttConsumer.this.retain();
            }

            public SessionSink<Delivery> downstream_session_sink() {
                return this.downstream;
            }

            public DeliveryProducer producer() {
                return this.producer;
            }

            public String toString() {
                if (MqttSession.this.handler == null) {
                    return "unconnected";
                }
                return "connection to " + MqttSession.this.handler.connection().transport().getRemoteAddress();
            }

            public MqttConsumer consumer() {
                return MqttSession.this.mqtt_consumer();
            }

            public void close() {
                this.producer.dispatch_queue().assertExecuting();
                if (!this.closed) {
                    this.closed = true;
                    this.dispose();
                }
            }

            public void dispose() {
                MqttConsumer.this.session_manager.close(this.downstream(), Scala2Java.toScala((Fn1)new UnitFn1<Delivery>(){

                    public void call(Delivery delivery) {
                        if (delivery.ack() != null) {
                            delivery.ack().apply((Object)Undelivered$.MODULE$, (Object)delivery.uow());
                        }
                    }
                }));
                MqttConsumer.this.release();
            }

            public boolean offer(Delivery delivery) {
                if (this.full()) {
                    return false;
                }
                delivery.message().retain();
                boolean rc = this.downstream().offer((Object)delivery);
                assert (rc) : "offer should be accepted since it was not full";
                return true;
            }
        }
    }

    class IntPair {
        int _1;
        int _2;

        IntPair(int int1, int int2) {
            this._1 = int1;
            this._2 = int2;
        }
    }

    class ExactlyOnceProducerAck
    extends AtLeastOnceProducerAck {
        ExactlyOnceProducerAck(PUBLISH publish) {
            super(publish);
        }

        @Override
        public void call(DeliveryResult r, StoreUOW uow) {
            MqttSession.this.queue.execute(new Task(){

                public void run() {
                    MqttSession.this.session_state.received_message_ids.add(ExactlyOnceProducerAck.this.publish.messageId());
                    MqttSession.this.session_state.strategy.update(new Task(){

                        public void run() {
                            PUBREC response = new PUBREC();
                            response.messageId(ExactlyOnceProducerAck.this.publish.messageId());
                            MqttSession.this.send((MessageSupport.Message)response);
                        }
                    });
                }
            });
        }
    }

    class AtLeastOnceProducerAck
    extends UnitFn2<DeliveryResult, StoreUOW> {
        public final PUBLISH publish;

        AtLeastOnceProducerAck(PUBLISH publish) {
            this.publish = publish;
        }

        public void call(DeliveryResult r, StoreUOW uow) {
            MqttSession.this.queue.execute(new Task(){

                public void run() {
                    PUBACK response = new PUBACK();
                    response.messageId(AtLeastOnceProducerAck.this.publish.messageId());
                    MqttSession.this.send((MessageSupport.Message)response);
                }
            });
        }
    }

    class MqttProducerRoute
    extends DeliveryProducerRoute {
        public final SimpleAddress address;
        public final MqttProtocolHandler handler;
        boolean suspended;

        public MqttProducerRoute(SimpleAddress address, MqttProtocolHandler h) {
            super(MqttSession.this.host().router());
            this.suspended = false;
            this.address = address;
            this.handler = h;
            this.refiller_$eq(new Task(){

                public void run() {
                    if (MqttProducerRoute.this.suspended) {
                        MqttProducerRoute.this.suspended = false;
                        MqttProducerRoute.this.handler.resume_read();
                    }
                }
            });
        }

        public int send_buffer_size() {
            return this.handler.codec.getReadBufferSize();
        }

        public Option<BrokerConnection> connection() {
            return Scala2Java.some((Object)this.handler.connection());
        }

        public DispatchQueue dispatch_queue() {
            return MqttSession.this.queue;
        }
    }
}

