package asia.stampy.common.netty;

import asia.stampy.common.StampyLibrary;
import asia.stampy.common.gateway.AbstractStampyMessageGateway;
import asia.stampy.common.gateway.DefaultUnparseableMessageHandler;
import asia.stampy.common.gateway.HostPort;
import asia.stampy.common.gateway.MessageListenerHaltException;
import asia.stampy.common.gateway.StampyHandlerHelper;
import asia.stampy.common.gateway.UnparseableMessageHandler;
import asia.stampy.common.heartbeat.StampyHeartbeatContainer;
import asia.stampy.common.message.StampyMessage;
import asia.stampy.common.parsing.StompMessageParser;
import asia.stampy.common.parsing.UnparseableException;
import java.lang.invoke.MethodHandles;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@StampyLibrary(libraryName = "stampy-NETTY-client-server-RI")
@ChannelHandler.Sharable
/* loaded from: input_file:asia/stampy/common/netty/StampyNettyChannelHandler.class */
public abstract class StampyNettyChannelHandler extends SimpleChannelUpstreamHandler {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private StampyHeartbeatContainer heartbeatContainer;
    private AbstractStampyMessageGateway gateway;
    private static final String ILLEGAL_ACCESS_ATTEMPT = "Illegal access attempt";
    private StompMessageParser parser = new StompMessageParser();
    private Executor executor = Executors.newSingleThreadExecutor();
    private UnparseableMessageHandler unparseableMessageHandler = new DefaultUnparseableMessageHandler();
    private Map<HostPort, Channel> sessions = new ConcurrentHashMap();
    private StampyHandlerHelper helper = new StampyHandlerHelper();

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        final HostPort createHostPort = createHostPort(channelHandlerContext);
        log.debug("Received raw message {} from {}", messageEvent.getMessage(), createHostPort);
        this.helper.resetHeartbeat(createHostPort);
        if (!this.helper.isValidObject(messageEvent.getMessage())) {
            log.error("Object {} is not a valid STOMP message, closing connection {}", messageEvent.getMessage(), createHostPort);
            illegalAccess(channelHandlerContext);
            return;
        }
        final String str = (String) messageEvent.getMessage();
        if (this.helper.isHeartbeat(str)) {
            log.trace("Received heartbeat");
        } else {
            getExecutor().execute(new Runnable() { // from class: asia.stampy.common.netty.StampyNettyChannelHandler.1
                @Override // java.lang.Runnable
                public void run() {
                    StampyNettyChannelHandler.this.asyncProcessing(createHostPort, str);
                }
            });
        }
    }

    protected HostPort createHostPort(ChannelHandlerContext channelHandlerContext) {
        return new HostPort((InetSocketAddress) channelHandlerContext.getChannel().getRemoteAddress());
    }

    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        this.sessions.put(createHostPort(channelHandlerContext), channelHandlerContext.getChannel());
        channelHandlerContext.sendUpstream(channelStateEvent);
    }

    public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        this.sessions.remove(createHostPort(channelHandlerContext));
        channelHandlerContext.sendUpstream(channelStateEvent);
    }

    public Set<HostPort> getConnectedHostPorts() {
        return Collections.unmodifiableSet(this.sessions.keySet());
    }

    public boolean isConnected(HostPort hostPort) {
        return this.sessions.containsKey(hostPort);
    }

    public void broadcastMessage(String str) {
        Iterator<Channel> it = this.sessions.values().iterator();
        while (it.hasNext()) {
            sendMessage(str, null, it.next());
        }
    }

    public void sendMessage(String str, HostPort hostPort) {
        sendMessage(str, hostPort, this.sessions.get(hostPort));
    }

    private synchronized void sendMessage(String str, HostPort hostPort, Channel channel) {
        if (channel == null || !channel.isConnected()) {
            log.error("Channel is not connected, cannot send message {}", str);
            return;
        }
        if (hostPort == null) {
            hostPort = new HostPort((InetSocketAddress) channel.getRemoteAddress());
        }
        this.helper.resetHeartbeat(hostPort);
        channel.write(str);
    }

    public void close(HostPort hostPort) {
        if (!isConnected(hostPort)) {
            log.warn("{} is already closed");
        } else {
            this.sessions.get(hostPort).close().awaitUninterruptibly();
            log.info("Session for {} has been closed", hostPort);
        }
    }

    protected void asyncProcessing(HostPort hostPort, String str) {
        StampyMessage stampyMessage = null;
        try {
            stampyMessage = getParser().parseMessage(str);
            getGateway().notifyMessageListeners(stampyMessage, hostPort);
        } catch (MessageListenerHaltException e) {
        } catch (UnparseableException e2) {
            this.helper.handleUnparseableMessage(hostPort, str, e2);
        } catch (Exception e3) {
            this.helper.handleUnexpectedError(hostPort, str, stampyMessage, e3);
        }
    }

    protected void illegalAccess(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.getChannel().write(ILLEGAL_ACCESS_ATTEMPT).awaitUninterruptibly();
        channelHandlerContext.getChannel().close().awaitUninterruptibly();
    }

    public StompMessageParser getParser() {
        return this.parser;
    }

    public void setParser(StompMessageParser stompMessageParser) {
        this.parser = stompMessageParser;
        this.helper.setParser(stompMessageParser);
    }

    public StampyHeartbeatContainer getHeartbeatContainer() {
        return this.heartbeatContainer;
    }

    public void setHeartbeatContainer(StampyHeartbeatContainer stampyHeartbeatContainer) {
        this.heartbeatContainer = stampyHeartbeatContainer;
        this.helper.setHeartbeatContainer(stampyHeartbeatContainer);
    }

    public AbstractStampyMessageGateway getGateway() {
        return this.gateway;
    }

    public void setGateway(AbstractStampyMessageGateway abstractStampyMessageGateway) {
        this.gateway = abstractStampyMessageGateway;
        this.helper.setGateway(abstractStampyMessageGateway);
    }

    public UnparseableMessageHandler getUnparseableMessageHandler() {
        return this.unparseableMessageHandler;
    }

    public void setUnparseableMessageHandler(UnparseableMessageHandler unparseableMessageHandler) {
        this.unparseableMessageHandler = unparseableMessageHandler;
        this.helper.setUnparseableMessageHandler(unparseableMessageHandler);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
        log.error("Unexpected Netty exception for " + createHostPort(channelHandlerContext), exceptionEvent.getCause());
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }
}
