/*
 * Decompiled with CFR 0.152.
 */
package org.richfaces.application.push.impl;

import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.richfaces.application.push.MessageException;
import org.richfaces.application.push.Session;
import org.richfaces.application.push.SessionSubscriptionEvent;
import org.richfaces.application.push.SessionUnsubscriptionEvent;
import org.richfaces.application.push.TopicEvent;
import org.richfaces.application.push.TopicKey;
import org.richfaces.application.push.impl.AbstractTopic;
import org.richfaces.application.push.impl.TopicsContextImpl;

public class TopicImpl
extends AbstractTopic {
    private ConcurrentMap<TopicKey, PublishingContext> sessions = new ConcurrentHashMap<TopicKey, PublishingContext>();
    private TopicsContextImpl topicsContext;

    public TopicImpl(TopicKey key, TopicsContextImpl topicsContext) {
        super(key);
        this.topicsContext = topicsContext;
    }

    @Override
    public void publish(Object messageData) throws MessageException {
        PublishingContext topicContext;
        String serializedData = this.getMessageDataSerializer().serialize(messageData);
        if (serializedData != null && (topicContext = this.getPublishingContext(this.getKey())) != null) {
            topicContext.addMessage(serializedData);
        }
    }

    @Override
    public void publishEvent(TopicEvent event) {
        super.publishEvent(event);
        if (event instanceof SessionSubscriptionEvent) {
            SessionSubscriptionEvent subscriptionEvent = (SessionSubscriptionEvent)event;
            this.getOrCreatePublishingContext(subscriptionEvent.getTopicKey()).addSession(subscriptionEvent.getSession());
        } else if (event instanceof SessionUnsubscriptionEvent) {
            SessionUnsubscriptionEvent unsubscriptionEvent = (SessionUnsubscriptionEvent)event;
            this.getPublishingContext(unsubscriptionEvent.getTopicKey()).removeSession(unsubscriptionEvent.getSession());
        }
    }

    private PublishingContext getPublishingContext(TopicKey key) {
        return (PublishingContext)this.sessions.get(key);
    }

    private PublishingContext getOrCreatePublishingContext(TopicKey key) {
        PublishingContext freshContext;
        PublishingContext result = (PublishingContext)this.sessions.get(key);
        if (result == null && (result = this.sessions.putIfAbsent(key, freshContext = new PublishingContext(key))) == null) {
            result = freshContext;
        }
        return result;
    }

    private static final class PublishTask
    implements Runnable {
        private final PublishingContext topicContext;

        public PublishTask(PublishingContext topicContext) {
            this.topicContext = topicContext;
        }

        @Override
        public void run() {
            this.topicContext.publishMessages();
        }
    }

    private final class PublishingContext {
        private final List<Session> sessions = new CopyOnWriteArrayList<Session>();
        private final Queue<String> serializedMessages = new ConcurrentLinkedQueue<String>();
        private final TopicKey key;
        private boolean submittedForPublishing;

        public PublishingContext(TopicKey key) {
            this.key = key;
        }

        public void addSession(Session session) {
            this.sessions.add(session);
        }

        public void removeSession(Session session) {
            this.sessions.remove(session);
        }

        public void addMessage(String serializedMessageData) {
            this.serializedMessages.add(serializedMessageData);
            this.submitForPublishing();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void publishMessages() {
            Iterator itr = this.serializedMessages.iterator();
            while (itr.hasNext()) {
                String message = (String)itr.next();
                for (Session session : this.sessions) {
                    session.push(this.key, message);
                }
                itr.remove();
            }
            PublishingContext publishingContext = this;
            synchronized (publishingContext) {
                this.submittedForPublishing = false;
                if (!this.serializedMessages.isEmpty()) {
                    this.submitForPublishing();
                }
            }
        }

        private synchronized void submitForPublishing() {
            if (!this.submittedForPublishing) {
                this.submittedForPublishing = true;
                TopicImpl.this.topicsContext.getPublisherService().submit(new PublishTask(this));
            }
        }
    }
}

