/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.apollo.mqtt;

import java.util.HashMap;
import java.util.HashSet;
import org.apache.activemq.apollo.broker.BindAddress;
import org.apache.activemq.apollo.broker.SimpleAddress;
import org.apache.activemq.apollo.broker.SubscriptionAddress;
import org.apache.activemq.apollo.broker.VirtualHost;
import org.apache.activemq.apollo.broker.store.Store;
import org.apache.activemq.apollo.broker.store.StoreUOW;
import org.apache.activemq.apollo.mqtt.MqttProtocolHandler;
import org.apache.activemq.apollo.mqtt.MqttSession;
import org.apache.activemq.apollo.mqtt.SessionPB;
import org.apache.activemq.apollo.mqtt.TopicPB;
import org.apache.activemq.apollo.util.Fn0;
import org.apache.activemq.apollo.util.Fn1;
import org.apache.activemq.apollo.util.Scala2Java;
import org.apache.activemq.apollo.util.UnitFn0;
import org.apache.activemq.apollo.util.UnitFn1;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtbuf.proto.InvalidProtocolBufferException;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import scala.Tuple2;
import scala.collection.Seq;

public class MqttSessionManager {
    public static final Scala2Java.Logger log = MqttProtocolHandler.log;
    static DispatchQueue queue = Dispatch.createQueue((String)"session manager");

    public static void attach(final VirtualHost host, final UTF8Buffer client_id, final MqttProtocolHandler handler) {
        queue.execute(new Task(){

            public void run() {
                final HostState host_state = (HostState)host.plugin_state(Scala2Java.toScala((Fn0)new Fn0<HostState>(){

                    public HostState apply() {
                        return new HostState(host);
                    }
                }), HostState.class);
                host_state.on_load(new Task(){

                    public void run() {
                        MqttSession assignment = host_state.sessions.get(client_id);
                        if (assignment != null) {
                            assignment.connect(handler);
                        } else {
                            SessionState state;
                            if (handler.connect_message.cleanSession()) {
                                state = host_state.session_states.remove(client_id);
                                if (state == null) {
                                    state = new SessionState();
                                }
                            } else {
                                state = host_state.session_states.get(client_id);
                                if (state == null) {
                                    state = new SessionState();
                                    host_state.session_states.put(client_id, state);
                                }
                            }
                            assignment = new MqttSession(host_state, client_id, state);
                            assignment.connect(handler);
                            host_state.sessions.put(client_id, assignment);
                        }
                    }
                });
            }
        });
    }

    public static void disconnect(final HostState host_state, final UTF8Buffer client_id, final MqttProtocolHandler handler) {
        queue.execute(new Task(){

            public void run() {
                MqttSession assignment = host_state.sessions.get(client_id);
                if (assignment != null) {
                    assignment.disconnect(handler);
                }
            }
        });
    }

    public static void remove(final HostState host_state, final UTF8Buffer client_id) {
        queue.execute(new Task(){

            public void run() {
                host_state.sessions.remove(client_id);
            }
        });
    }

    public static class HostState {
        public final VirtualHost host;
        public final HashMap<UTF8Buffer, SessionState> session_states = new HashMap();
        public final HashMap<UTF8Buffer, MqttSession> sessions = new HashMap();
        public boolean loaded = false;

        public HostState(VirtualHost host) {
            this.host = host;
        }

        public void on_load(final Task func) {
            if (this.loaded) {
                func.run();
            } else if (this.host.store() != null) {
                queue.suspend();
                this.host.store().get_prefixed_map_entries((Buffer)new AsciiBuffer("mqtt:"), Scala2Java.toScala((Fn1)new UnitFn1<Seq<Tuple2<Buffer, Buffer>>>(){

                    public void call(final Seq<Tuple2<Buffer, Buffer>> entries) {
                        queue.resume();
                        queue.execute(new Task(){

                            public void run() {
                                for (Tuple2 entry : Scala2Java.toIterable((Seq)entries)) {
                                    try {
                                        Buffer value = (Buffer)entry._2();
                                        SessionPB.Buffer session_pb = SessionPB.FACTORY.parseUnframed(value);
                                        SessionState session_state = new SessionState();
                                        session_state.strategy.create(HostState.this.host.store(), session_pb.getClientId());
                                        if (session_pb.hasReceivedMessageIds()) {
                                            for (Integer i : session_pb.getReceivedMessageIdsList()) {
                                                session_state.received_message_ids.add(i.shortValue());
                                            }
                                        }
                                        if (session_pb.hasSubscriptions()) {
                                            for (TopicPB.Getter sub : session_pb.getSubscriptionsList()) {
                                                SimpleAddress address = SimpleAddress.apply((String)sub.getAddress().toString());
                                                Topic topic = new Topic(sub.getName(), QoS.values()[sub.getQos()]);
                                                session_state.subscriptions.put(sub.getName(), (Tuple2<Topic, BindAddress>)new Tuple2((Object)topic, (Object)address));
                                            }
                                        }
                                        HostState.this.session_states.put(session_pb.getClientId(), session_state);
                                    }
                                    catch (InvalidProtocolBufferException e) {
                                        log.warn((Throwable)e, "Could not load a stored MQTT session", new Object[0]);
                                    }
                                }
                                HostState.this.loaded = true;
                                func.run();
                            }
                        });
                    }
                }));
            } else {
                this.loaded = true;
                func.run();
            }
        }
    }

    static class SessionState {
        SubscriptionAddress durable_sub = null;
        HashMap<UTF8Buffer, Tuple2<Topic, BindAddress>> subscriptions = new HashMap();
        HashSet<Short> received_message_ids = new HashSet();
        StorageStrategy strategy = new NoopStrategy();

        SessionState() {
        }

        class StoreStrategy
        implements StorageStrategy {
            public final Store store;
            public final UTF8Buffer client_id;
            public final UTF8Buffer session_key;

            public StoreStrategy(Store store, UTF8Buffer client_id) {
                this.store = store;
                this.client_id = client_id;
                this.session_key = new UTF8Buffer("mqtt:" + client_id);
            }

            @Override
            public void create(Store store, UTF8Buffer client_id) {
            }

            @Override
            public void update(final Task cb) {
                StoreUOW uow = this.store.create_uow();
                SessionPB.Bean session_pb = new SessionPB.Bean();
                session_pb.setClientId(this.client_id);
                for (Short s : SessionState.this.received_message_ids) {
                    session_pb.addReceivedMessageIds(s.intValue());
                }
                for (Tuple2 tuple2 : SessionState.this.subscriptions.values()) {
                    Topic topic = (Topic)tuple2._1();
                    BindAddress address = (BindAddress)tuple2._2();
                    TopicPB.Bean topic_pb = new TopicPB.Bean();
                    topic_pb.setName(topic.name());
                    topic_pb.setQos(topic.qos().ordinal());
                    topic_pb.setAddress(new UTF8Buffer(address.toString()));
                    session_pb.addSubscriptions(topic_pb);
                }
                uow.put((Buffer)this.session_key, session_pb.freeze().toUnframedBuffer());
                final DispatchQueue current = Dispatch.getCurrentQueue();
                uow.on_complete(Scala2Java.toScala((Fn0)new UnitFn0(){

                    public void call() {
                        current.execute(new Task(){

                            public void run() {
                                cb.run();
                            }
                        });
                    }
                }));
                uow.release();
            }

            @Override
            public void destroy(final Task cb) {
                StoreUOW uow = this.store.create_uow();
                uow.put((Buffer)this.session_key, null);
                final DispatchQueue current = Dispatch.getCurrentQueue();
                uow.on_complete(Scala2Java.toScala((Fn0)new UnitFn0(){

                    public void call() {
                        current.execute(new Task(){

                            public void run() {
                                SessionState.this.strategy = new NoopStrategy();
                                cb.run();
                            }
                        });
                    }
                }));
                uow.release();
            }
        }

        class NoopStrategy
        implements StorageStrategy {
            NoopStrategy() {
            }

            @Override
            public void create(Store store, UTF8Buffer client_id) {
                if (store != null) {
                    SessionState.this.strategy = new StoreStrategy(store, client_id);
                }
            }

            @Override
            public void update(Task cb) {
                cb.run();
            }

            @Override
            public void destroy(Task cb) {
                cb.run();
            }
        }
    }

    static interface StorageStrategy {
        public void update(Task var1);

        public void destroy(Task var1);

        public void create(Store var1, UTF8Buffer var2);
    }
}

