/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.ra;

import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkManager;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionConsumer;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.ra.ActiveMQEndpointActivationKey;
import org.apache.activemq.ra.MessageActivationSpec;
import org.apache.activemq.ra.MessageResourceAdapter;
import org.apache.activemq.ra.ServerSessionPoolImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActiveMQEndpointWorker {
    public static final Method ON_MESSAGE_METHOD;
    private static final Logger LOG;
    private static final long INITIAL_RECONNECT_DELAY = 1000L;
    private static final long MAX_RECONNECT_DELAY = 30000L;
    private static final ThreadLocal<Session> THREAD_LOCAL;
    protected final ActiveMQEndpointActivationKey endpointActivationKey;
    protected final MessageEndpointFactory endpointFactory;
    protected final WorkManager workManager;
    protected final boolean transacted;
    private final ActiveMQDestination dest;
    private final Work connectWork;
    private final AtomicBoolean connecting = new AtomicBoolean(false);
    private final Object shutdownMutex = new String("shutdownMutex");
    private ActiveMQConnection connection;
    private ActiveMQConnectionConsumer consumer;
    private ServerSessionPoolImpl serverSessionPool;
    private boolean running;

    protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
        this.endpointActivationKey = key;
        this.endpointFactory = this.endpointActivationKey.getMessageEndpointFactory();
        this.workManager = adapter.getBootstrapContext().getWorkManager();
        try {
            this.transacted = this.endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
        }
        catch (NoSuchMethodException e) {
            throw new ResourceException("Endpoint does not implement the onMessage method.");
        }
        this.connectWork = new Work(){
            long currentReconnectDelay = 1000L;

            public void release() {
            }

            public void run() {
                this.currentReconnectDelay = 1000L;
                MessageActivationSpec activationSpec = ActiveMQEndpointWorker.this.endpointActivationKey.getActivationSpec();
                if (LOG.isInfoEnabled()) {
                    LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
                }
                while (ActiveMQEndpointWorker.this.connecting.get() && ActiveMQEndpointWorker.this.running) {
                    try {
                        ActiveMQEndpointWorker.this.connection = adapter.makeConnection(activationSpec);
                        ActiveMQEndpointWorker.this.connection.setExceptionListener(new ExceptionListener(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            public void onException(JMSException error) {
                                if (!ActiveMQEndpointWorker.this.serverSessionPool.isClosing()) {
                                    LOG.error("Connection to broker failed: " + error.getMessage(), (Throwable)error);
                                    if (ActiveMQEndpointWorker.this.connecting.compareAndSet(false, true)) {
                                        Work work = ActiveMQEndpointWorker.this.connectWork;
                                        synchronized (work) {
                                            ActiveMQEndpointWorker.this.disconnect();
                                            ActiveMQEndpointWorker.this.serverSessionPool.closeIdleSessions();
                                            ActiveMQEndpointWorker.this.connect();
                                        }
                                    } else {
                                        LOG.info("Connection attempt already in progress, ignoring connection exception");
                                    }
                                }
                            }
                        });
                        ActiveMQEndpointWorker.this.connection.start();
                        if (activationSpec.isDurableSubscription()) {
                            ActiveMQEndpointWorker.this.consumer = (ActiveMQConnectionConsumer)ActiveMQEndpointWorker.this.connection.createDurableConnectionConsumer((Topic)ActiveMQEndpointWorker.this.dest, activationSpec.getSubscriptionName(), ActiveMQEndpointWorker.this.emptyToNull(activationSpec.getMessageSelector()), (ServerSessionPool)ActiveMQEndpointWorker.this.serverSessionPool, ActiveMQEndpointWorker.this.connection.getPrefetchPolicy().getDurableTopicPrefetch(), activationSpec.getNoLocalBooleanValue());
                        } else {
                            ActiveMQEndpointWorker.this.consumer = (ActiveMQConnectionConsumer)ActiveMQEndpointWorker.this.connection.createConnectionConsumer((Destination)ActiveMQEndpointWorker.this.dest, ActiveMQEndpointWorker.this.emptyToNull(activationSpec.getMessageSelector()), (ServerSessionPool)ActiveMQEndpointWorker.this.serverSessionPool, this.getPrefetch(activationSpec, ActiveMQEndpointWorker.this.connection, ActiveMQEndpointWorker.this.dest), activationSpec.getNoLocalBooleanValue());
                        }
                        if (ActiveMQEndpointWorker.this.connecting.compareAndSet(true, false)) {
                            if (LOG.isInfoEnabled()) {
                                LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
                            }
                        } else {
                            LOG.error("Could not release connection lock");
                        }
                        if (ActiveMQEndpointWorker.this.consumer.getConsumerInfo().getCurrentPrefetchSize() != 0) continue;
                        LOG.error("Endpoint " + ActiveMQEndpointWorker.this.endpointActivationKey.getActivationSpec() + " will not receive any messages due to broker 'zero prefetch' configuration for: " + ActiveMQEndpointWorker.this.dest);
                    }
                    catch (JMSException error) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Failed to connect: " + error.getMessage(), (Throwable)error);
                        }
                        ActiveMQEndpointWorker.this.disconnect();
                        this.pause(error);
                    }
                }
            }

            private int getPrefetch(MessageActivationSpec activationSpec, ActiveMQConnection connection, ActiveMQDestination destination) {
                if (destination.isTopic()) {
                    return connection.getPrefetchPolicy().getTopicPrefetch();
                }
                if (destination.isQueue()) {
                    return connection.getPrefetchPolicy().getQueuePrefetch();
                }
                return activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void pause(JMSException error) {
                if (this.currentReconnectDelay == 30000L) {
                    LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " + error.getMessage(), (Throwable)error);
                    LOG.error("Endpoint will try to reconnect to the JMS broker in 30 seconds");
                }
                try {
                    Object object = ActiveMQEndpointWorker.this.shutdownMutex;
                    synchronized (object) {
                        ActiveMQEndpointWorker.this.shutdownMutex.wait(this.currentReconnectDelay);
                    }
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
                this.currentReconnectDelay *= 2L;
                if (this.currentReconnectDelay > 30000L) {
                    this.currentReconnectDelay = 30000L;
                }
            }
        };
        MessageActivationSpec activationSpec = this.endpointActivationKey.getActivationSpec();
        if (activationSpec.isUseJndi()) {
            try {
                InitialContext initialContext = new InitialContext();
                this.dest = (ActiveMQDestination)initialContext.lookup(activationSpec.getDestination());
            }
            catch (NamingException exc) {
                throw new ResourceException("JNDI lookup failed for " + activationSpec.getDestination());
            }
        } else if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
            this.dest = new ActiveMQQueue(activationSpec.getDestination());
        } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
            this.dest = new ActiveMQTopic(activationSpec.getDestination());
        } else {
            throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
        }
    }

    public static void safeClose(Connection c) {
        try {
            if (c != null) {
                LOG.debug("Closing connection to broker");
                c.close();
            }
        }
        catch (JMSException jMSException) {
            // empty catch block
        }
    }

    public static void safeClose(ConnectionConsumer cc) {
        try {
            if (cc != null) {
                LOG.debug("Closing ConnectionConsumer");
                cc.close();
            }
        }
        catch (JMSException jMSException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws ResourceException {
        Work work = this.connectWork;
        synchronized (work) {
            if (this.running) {
                return;
            }
            this.running = true;
            if (this.connecting.compareAndSet(false, true)) {
                LOG.info("Starting");
                this.serverSessionPool = new ServerSessionPoolImpl(this, this.endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
                this.connect();
            } else {
                LOG.warn("Ignoring start command, EndpointWorker is already trying to connect");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws InterruptedException {
        Object object = this.shutdownMutex;
        synchronized (object) {
            if (!this.running) {
                return;
            }
            this.running = false;
            LOG.info("Stopping");
            this.shutdownMutex.notifyAll();
            this.serverSessionPool.close();
        }
        this.disconnect();
    }

    private boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect() {
        Work work = this.connectWork;
        synchronized (work) {
            if (!this.running) {
                return;
            }
            try {
                this.workManager.scheduleWork(this.connectWork, Long.MAX_VALUE, null, null);
            }
            catch (WorkException e) {
                this.running = false;
                LOG.error("Work Manager did not accept work: ", (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disconnect() {
        Work work = this.connectWork;
        synchronized (work) {
            ActiveMQEndpointWorker.safeClose((ConnectionConsumer)this.consumer);
            this.consumer = null;
            ActiveMQEndpointWorker.safeClose((Connection)this.connection);
            this.connection = null;
        }
    }

    protected void registerThreadSession(Session session) {
        THREAD_LOCAL.set(session);
    }

    protected void unregisterThreadSession(Session session) {
        THREAD_LOCAL.set(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ActiveMQConnection getConnection() {
        Work work = this.connectWork;
        synchronized (work) {
            return this.connection;
        }
    }

    private String emptyToNull(String value) {
        if (value == null || value.length() == 0) {
            return null;
        }
        return value;
    }

    static {
        LOG = LoggerFactory.getLogger(ActiveMQEndpointWorker.class);
        THREAD_LOCAL = new ThreadLocal();
        try {
            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", Message.class);
        }
        catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}

