/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.apollo.mqtt;

import java.io.IOException;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.activemq.apollo.broker.AbstractSinkFilter;
import org.apache.activemq.apollo.broker.AbstractSinkMapper;
import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.Consumed$;
import org.apache.activemq.apollo.broker.DestinationParser;
import org.apache.activemq.apollo.broker.OverflowSink;
import org.apache.activemq.apollo.broker.Sink;
import org.apache.activemq.apollo.broker.SinkMux;
import org.apache.activemq.apollo.broker.VirtualHost;
import org.apache.activemq.apollo.broker.protocol.AbstractProtocolHandler;
import org.apache.activemq.apollo.broker.protocol.ProtocolFilter3;
import org.apache.activemq.apollo.broker.protocol.ProtocolFilter3$;
import org.apache.activemq.apollo.broker.protocol.ProtocolHandler;
import org.apache.activemq.apollo.broker.security.SecuredResource;
import org.apache.activemq.apollo.broker.security.SecurityContext;
import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;
import org.apache.activemq.apollo.dto.ProtocolDTO;
import org.apache.activemq.apollo.mqtt.MqttProtocol;
import org.apache.activemq.apollo.mqtt.MqttSessionManager;
import org.apache.activemq.apollo.mqtt.Request;
import org.apache.activemq.apollo.mqtt.dto.MqttConnectionStatusDTO;
import org.apache.activemq.apollo.mqtt.dto.MqttDTO;
import org.apache.activemq.apollo.util.Fn0;
import org.apache.activemq.apollo.util.Fn1;
import org.apache.activemq.apollo.util.Log$;
import org.apache.activemq.apollo.util.LongCounter;
import org.apache.activemq.apollo.util.Scala2Java;
import org.apache.activemq.apollo.util.UnitFn1;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.hawtdispatch.transport.HeartBeatMonitor;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.MQTTProtocolCodec;
import org.fusesource.mqtt.codec.MessageSupport;

public class MqttProtocolHandler
extends AbstractProtocolHandler {
    public static final Scala2Java.Logger log = new Scala2Java.Logger(Log$.MODULE$.apply(MqttProtocolHandler.class));
    static Fn0<String> WAITING_ON_CLIENT_REQUEST = new Fn0<String>(){

        public String apply() {
            return "client request";
        }
    };
    Scala2Java.Logger connection_log = log;
    MqttDTO config = null;
    final ArrayList<ProtocolFilter3> protocol_filters = new ArrayList();
    final SecurityContext security_context = new SecurityContext();
    SinkMux<Request> sink_manager = null;
    Sink<Request> connection_sink = null;
    MQTTProtocolCodec codec = null;
    boolean closed = false;
    public static UnitFn1<Object> dead_handler = new UnitFn1<Object>(){

        public void call(Object v1) {
        }
    };
    Fn0<String> status = WAITING_ON_CLIENT_REQUEST;
    boolean dead = false;
    UnitFn1<Object> command_handler = this.connect_handler();
    CONNECT connect_message = null;
    HeartBeatMonitor heart_beat_monitor = new HeartBeatMonitor();
    VirtualHost host = null;
    LongCounter messages_sent = new LongCounter(0L);
    LongCounter messages_received = new LongCounter(0L);
    int subscription_count = 0;

    public static <T> T received(T value) {
        log.trace("received: %s", new Object[]{value});
        return value;
    }

    public String protocol() {
        return "mqtt";
    }

    public Broker broker() {
        return this.connection().connector().broker();
    }

    public DispatchQueue queue() {
        return this.connection().dispatch_queue();
    }

    public DestinationParser destination_parser() {
        DestinationParser destination_parser = MqttProtocol.destination_parser;
        if (this.config.queue_prefix != null || this.config.path_separator != null || this.config.any_child_wildcard != null || this.config.any_descendant_wildcard != null || this.config.regex_wildcard_start != null || this.config.regex_wildcard_end != null || this.config.part_pattern != null) {
            destination_parser = new DestinationParser().copy(destination_parser);
            if (this.config.queue_prefix != null) {
                destination_parser.queue_prefix_$eq(this.config.queue_prefix);
            }
            if (this.config.path_separator != null) {
                destination_parser.path_separator_$eq(this.config.path_separator);
            }
            if (this.config.any_child_wildcard != null) {
                destination_parser.any_child_wildcard_$eq(this.config.any_child_wildcard);
            }
            if (this.config.any_descendant_wildcard != null) {
                destination_parser.any_descendant_wildcard_$eq(this.config.any_descendant_wildcard);
            }
            if (this.config.regex_wildcard_start != null) {
                destination_parser.regex_wildcard_start_$eq(this.config.regex_wildcard_start);
            }
            if (this.config.regex_wildcard_end != null) {
                destination_parser.regex_wildcard_end_$eq(this.config.regex_wildcard_end);
            }
            if (this.config.part_pattern != null) {
                destination_parser.part_pattern_$eq(Pattern.compile(this.config.part_pattern));
            }
        }
        return destination_parser;
    }

    public String session_id() {
        return this.security_context.session_id();
    }

    private static MqttDTO find_config(AcceptingConnectorDTO connector_config) {
        for (ProtocolDTO protocol : connector_config.protocols) {
            if (!(protocol instanceof MqttDTO)) continue;
            return (MqttDTO)protocol;
        }
        return new MqttDTO();
    }

    public void on_transport_connected() {
        this.codec = (MQTTProtocolCodec)this.connection().transport().getProtocolCodec();
        AcceptingConnectorDTO connector_config = (AcceptingConnectorDTO)this.connection().connector().config();
        this.config = MqttProtocolHandler.find_config(connector_config);
        this.codec.setMaxMessageLength(Scala2Java.get((Integer)this.config.max_message_length, (int)this.codec.getMaxMessageLength()));
        this.protocol_filters.clear();
        this.protocol_filters.addAll(ProtocolFilter3$.MODULE$.create_filters(this.config.protocol_filters, (ProtocolHandler)this));
        this.security_context.local_address_$eq(this.connection().transport().getLocalAddress());
        this.security_context.remote_address_$eq(this.connection().transport().getRemoteAddress());
        this.security_context.connector_id_$eq(this.connection().connector().id());
        this.security_context.certificates_$eq(this.connection().certificates());
        this.connection_log = new Scala2Java.Logger(this.connection().connector().broker().connection_log());
        Object filtering_sink = new AbstractSinkMapper<Request, Object>(){

            public Sink<Object> downstream() {
                return MqttProtocolHandler.this.connection().transport_sink();
            }

            public MQTTFrame passing(Request request) {
                log.trace("sent: %s", new Object[]{request.message});
                request.delivered = true;
                if (request.id == 0 && request.ack != null) {
                    request.ack.apply((Object)Consumed$.MODULE$);
                }
                return request.frame;
            }
        };
        if (!this.protocol_filters.isEmpty()) {
            AbstractSinkMapper<Request, Object> downstream = filtering_sink;
            filtering_sink = new AbstractSinkFilter<Request, Request>((Sink)downstream){
                final /* synthetic */ Sink val$downstream;
                {
                    this.val$downstream = sink;
                }

                public Sink<Request> downstream() {
                    return this.val$downstream;
                }

                public Request filter(Request value) {
                    ProtocolFilter3 filter;
                    Request cur = value;
                    Iterator<ProtocolFilter3> i$ = MqttProtocolHandler.this.protocol_filters.iterator();
                    while (i$.hasNext() && (cur = (Request)(filter = i$.next()).filter_outbound((Object)cur)) != null) {
                    }
                    return cur;
                }
            };
        }
        this.sink_manager = new SinkMux((Sink)filtering_sink);
        this.connection_sink = new OverflowSink(this.sink_manager.open());
        this.resume_read();
    }

    public void on_transport_disconnected() {
        if (!this.closed) {
            this.closed = true;
            this.dead = true;
            this.command_handler = dead_handler;
            this.security_context.logout(Scala2Java.toScala((Fn1)new UnitFn1<Throwable>(){

                public void call(Throwable e) {
                    if (e != null) {
                        MqttProtocolHandler.this.connection_log.info(e, "MQTT connection '%s' log out error: %s", new Object[]{MqttProtocolHandler.this.security_context.remote_address(), e.toString()});
                    }
                }
            }));
            this.heart_beat_monitor.stop();
            if (!this.connection().stopped()) {
                this.connection().stop(Dispatch.NOOP);
            }
            log.trace("mqtt protocol resources released", new Object[0]);
        }
    }

    public void on_transport_failure(IOException error) {
        if (!this.dead) {
            this.command_handler.apply((Object)"failure");
            this.dead = true;
            this.command_handler = dead_handler;
            if (!this.connection().stopped()) {
                this.connection_log.info((Throwable)error, "Shutting connection '%s'  down due to: %s", new Object[]{this.security_context.remote_address(), error});
                super.on_transport_failure(error);
            }
        }
    }

    public void _suspend_read(final String reason) {
        this.suspend_read(new Fn0<String>(){

            public String apply() {
                return reason;
            }
        });
    }

    public void suspend_read(Fn0<String> reason) {
        this.status = reason;
        this.connection().transport().suspendRead();
        this.heart_beat_monitor.suspendRead();
    }

    public void resume_read() {
        this.status = WAITING_ON_CLIENT_REQUEST;
        this.connection().transport().resumeRead();
        this.heart_beat_monitor.resumeRead();
    }

    public long die_delay() {
        return Scala2Java.get((Long)this.config.die_delay, (long)5000L);
    }

    public void async_die(String msg) {
        this.async_die(msg, null);
    }

    public void async_die(String msg, Throwable e) {
        try {
            this.die(msg, e);
        }
        catch (Break break_) {
            // empty catch block
        }
    }

    public void async_die(MessageSupport.Message response, String msg) {
        try {
            this.die(response, msg, null);
        }
        catch (Break break_) {
            // empty catch block
        }
    }

    public <T> T die(String msg) {
        return this.die(null, msg, null);
    }

    public <T> T die(String msg, Throwable e) {
        return this.die(null, msg, e);
    }

    public <T> T die(MessageSupport.Message response, String msg) {
        return this.die(response, msg, null);
    }

    public <T> T die(MessageSupport.Message response, String msg, Throwable e) {
        if (e != null) {
            this.connection_log.info(e, "MQTT connection '%s' error: %s", new Object[]{this.security_context.remote_address(), msg, e});
        } else {
            this.connection_log.info("MQTT connection '%s' error: %s", new Object[]{this.security_context.remote_address(), msg});
        }
        return this.die(response);
    }

    public <T> T die(MessageSupport.Message response) {
        if (!this.dead) {
            this.command_handler.apply((Object)"failure");
            this.dead = true;
            this.command_handler = dead_handler;
            this.status = new Fn0<String>(){

                public String apply() {
                    return "shuting down";
                }
            };
            if (response != null) {
                this.connection().transport().resumeRead();
                this.connection_sink.offer((Object)new Request(0, response, null));
                this.queue().executeAfter(this.die_delay(), TimeUnit.MILLISECONDS, new Task(){

                    public void run() {
                        MqttProtocolHandler.this.connection().stop(Dispatch.NOOP);
                    }
                });
            } else {
                this.connection().stop(Dispatch.NOOP);
            }
        }
        throw new Break();
    }

    public void on_transport_command(Object command) {
        try {
            if (!this.protocol_filters.isEmpty()) {
                for (ProtocolFilter3 filter : this.protocol_filters) {
                    command = filter.filter_inbound(command);
                    if (command != null) continue;
                    return;
                }
            }
            this.command_handler.apply(command);
        }
        catch (Break e) {
        }
        catch (Exception e) {
            String msg = "Internal Server Error: " + e;
            if (this.connection_log != log) {
                log.warn((Throwable)e, msg, new Object[0]);
            }
            this.async_die(msg, e);
        }
    }

    public UnitFn1<Object> connect_handler() {
        return new UnitFn1<Object>(){

            public void call(Object o) {
                block6: {
                    if (o instanceof MQTTFrame) {
                        MQTTFrame command = (MQTTFrame)o;
                        try {
                            if (command.messageType() == 1) {
                                MqttProtocolHandler.this.connect_message = MqttProtocolHandler.received(new CONNECT().decode(command));
                                MqttProtocolHandler.this.on_mqtt_connect();
                                break block6;
                            }
                            MqttProtocolHandler.this.die("Expecting an MQTT CONNECT message, but got: " + command.getClass());
                        }
                        catch (ProtocolException e) {
                            MqttProtocolHandler.this.die("Internal Server Error: bad mqtt command: " + command);
                        }
                    } else if (!"failure".equals(o)) {
                        MqttProtocolHandler.this.die("Internal Server Error: unexpected mqtt command: " + o.getClass());
                    }
                }
            }
        };
    }

    public void on_mqtt_connect() {
        final CONNACK connack = new CONNACK();
        switch (this.connect_message.version()) {
            case 3: 
            case 4: {
                break;
            }
            default: {
                connack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION);
                this.die((MessageSupport.Message)connack, "Unsupported protocol version: " + this.connect_message.version());
            }
        }
        if (!(this.connect_message.clientId() != null && this.connect_message.clientId().length != 0 || this.connect_message.cleanSession())) {
            this.die((MessageSupport.Message)connack, "A clean session must be requested when no client id is provided.");
        }
        UTF8Buffer client_id = this.connect_message.clientId();
        this.security_context.user_$eq(Scala2Java.toString((Object)this.connect_message.userName()));
        this.security_context.password_$eq(Scala2Java.toString((Object)this.connect_message.password()));
        this.security_context.session_id_$eq(client_id.toString());
        final short keep_alive = this.connect_message.keepAlive();
        if (keep_alive > 0) {
            this.heart_beat_monitor.setReadInterval((long)((double)keep_alive * 1.5) * 1000L);
            this.heart_beat_monitor.setOnDead(new Task(){

                public void run() {
                    MqttProtocolHandler.this.async_die("Missed keep alive set to " + keep_alive + " seconds");
                }
            });
        }
        this.heart_beat_monitor.suspendRead();
        this.heart_beat_monitor.setTransport(this.connection().transport());
        this.heart_beat_monitor.start();
        this._suspend_read("virtual host lookup");
        this.broker().dispatch_queue().execute(new Task(){

            public void run() {
                MqttProtocolHandler.this.host = MqttProtocolHandler.this.connection().connector().broker().get_default_virtual_host();
                MqttProtocolHandler.this.queue().execute(new Task(){

                    public void run() {
                        MqttProtocolHandler.this.resume_read();
                        if (MqttProtocolHandler.this.host == null) {
                            connack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                            MqttProtocolHandler.this.async_die((MessageSupport.Message)connack, "Default virtual host not found.");
                        } else if (!MqttProtocolHandler.this.host.service_state().is_started()) {
                            connack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                            MqttProtocolHandler.this.async_die((MessageSupport.Message)connack, "Default virtual host stopped.");
                        } else {
                            MqttProtocolHandler.this.connection_log = new Scala2Java.Logger(MqttProtocolHandler.this.host.connection_log());
                            if (MqttProtocolHandler.this.host.authenticator() != null && MqttProtocolHandler.this.host.authorizer() != null) {
                                MqttProtocolHandler.this._suspend_read("authenticating and authorizing connect");
                                MqttProtocolHandler.this.host.authenticator().authenticate(MqttProtocolHandler.this.security_context, Scala2Java.toScala((Fn1)new UnitFn1<String>(){

                                    public void call(final String auth_err) {
                                        MqttProtocolHandler.this.queue().execute(new Task(){

                                            public void run() {
                                                if (auth_err != null) {
                                                    connack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
                                                    MqttProtocolHandler.this.async_die((MessageSupport.Message)connack, auth_err + ". Credentials=" + MqttProtocolHandler.this.security_context.credential_dump());
                                                } else if (!MqttProtocolHandler.this.host.authorizer().can(MqttProtocolHandler.this.security_context, "connect", (SecuredResource)MqttProtocolHandler.this.connection().connector()).booleanValue()) {
                                                    connack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
                                                    MqttProtocolHandler.this.async_die((MessageSupport.Message)connack, String.format("Not authorized to connect to connector '%s'. Principals=", MqttProtocolHandler.this.connection().connector().id(), MqttProtocolHandler.this.security_context.principal_dump()));
                                                } else if (!MqttProtocolHandler.this.host.authorizer().can(MqttProtocolHandler.this.security_context, "connect", (SecuredResource)MqttProtocolHandler.this.host).booleanValue()) {
                                                    connack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
                                                    MqttProtocolHandler.this.async_die((MessageSupport.Message)connack, String.format("Not authorized to connect to virtual host '%s'. Principals=", MqttProtocolHandler.this.host.id(), MqttProtocolHandler.this.security_context.principal_dump()));
                                                } else {
                                                    MqttProtocolHandler.this.resume_read();
                                                    MqttProtocolHandler.this.on_host_connected(MqttProtocolHandler.this.host);
                                                }
                                            }
                                        });
                                    }
                                }));
                            } else {
                                MqttProtocolHandler.this.on_host_connected(MqttProtocolHandler.this.host);
                            }
                        }
                    }
                });
            }
        });
    }

    public void on_host_connected(VirtualHost host) {
        MqttSessionManager.attach(host, this.connect_message.clientId(), this);
    }

    public MqttConnectionStatusDTO create_connection_status(boolean debug) {
        MqttConnectionStatusDTO rc = new MqttConnectionStatusDTO();
        rc.protocol_version = "3.1";
        rc.messages_sent = this.messages_sent.get();
        rc.messages_received = this.messages_received.get();
        rc.subscription_count = this.subscription_count;
        rc.waiting_on = (String)this.status.apply();
        return rc;
    }

    class Break
    extends RuntimeException {
        Break() {
        }
    }
}

