/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.ra.inflow;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.Queue;
import jakarta.jms.Topic;
import jakarta.resource.ResourceException;
import jakarta.resource.spi.endpoint.MessageEndpointFactory;
import jakarta.resource.spi.work.Work;
import jakarta.resource.spi.work.WorkException;
import jakarta.resource.spi.work.WorkManager;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.naming.InitialContext;
import javax.transaction.xa.XAResource;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.ra.ActiveMQRABundle;
import org.apache.activemq.artemis.ra.ActiveMQRALogger;
import org.apache.activemq.artemis.ra.ActiveMQRaUtils;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.apache.activemq.artemis.ra.inflow.ActiveMQMessageHandler;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActiveMQActivation {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final Method ONMESSAGE;
    private final ActiveMQResourceAdapter ra;
    private final ActiveMQActivationSpec spec;
    private final MessageEndpointFactory endpointFactory;
    private final AtomicBoolean deliveryActive = new AtomicBoolean(false);
    private boolean isTopic = false;
    private boolean isDeliveryTransacted;
    private ActiveMQDestination destination;
    private SimpleString topicTemporaryQueue;
    private final List<ActiveMQMessageHandler> handlers = new ArrayList<ActiveMQMessageHandler>();
    private ActiveMQConnectionFactory factory;
    private final List<String> nodes = Collections.synchronizedList(new ArrayList());
    private final Map<String, Long> removedNodes = new ConcurrentHashMap<String, Long>();
    private boolean lastReceived = false;
    private final Object teardownLock = new Object();
    private final AtomicBoolean inReconnect = new AtomicBoolean(false);
    private XARecoveryConfig resourceRecovery;

    public ActiveMQActivation(ActiveMQResourceAdapter ra, MessageEndpointFactory endpointFactory, ActiveMQActivationSpec spec) throws ResourceException {
        String pass;
        spec.validate();
        if (logger.isTraceEnabled()) {
            logger.trace("constructor({}, {}, {})", new Object[]{ra, endpointFactory, spec});
        }
        if ((pass = spec.getOwnPassword()) != null) {
            try {
                spec.setPassword(PasswordMaskingUtil.resolveMask((Boolean)ra.isUseMaskedPassword(), (String)pass, (String)ra.getCodec()));
            }
            catch (Exception e) {
                throw new ResourceException((Throwable)e);
            }
        }
        this.ra = ra;
        this.endpointFactory = endpointFactory;
        this.spec = spec;
        try {
            this.isDeliveryTransacted = endpointFactory.isDeliveryTransacted(ONMESSAGE);
        }
        catch (Exception e) {
            throw new ResourceException((Throwable)e);
        }
    }

    public ActiveMQActivationSpec getActivationSpec() {
        logger.trace("getActivationSpec()");
        return this.spec;
    }

    public MessageEndpointFactory getMessageEndpointFactory() {
        logger.trace("getMessageEndpointFactory()");
        return this.endpointFactory;
    }

    public boolean isDeliveryTransacted() {
        logger.trace("isDeliveryTransacted()");
        return this.isDeliveryTransacted;
    }

    public WorkManager getWorkManager() {
        logger.trace("getWorkManager()");
        return this.ra.getWorkManager();
    }

    public boolean isTopic() {
        logger.trace("isTopic()");
        return this.isTopic;
    }

    public void start() throws ResourceException {
        logger.trace("start()");
        this.deliveryActive.set(true);
        this.scheduleWork(new SetupActivation());
    }

    public SimpleString getTopicTemporaryQueue() {
        return this.topicTemporaryQueue;
    }

    public void setTopicTemporaryQueue(SimpleString topicTemporaryQueue) {
        this.topicTemporaryQueue = topicTemporaryQueue;
    }

    public List<XAResource> getXAResources() {
        ArrayList<XAResource> xaresources = new ArrayList<XAResource>();
        for (ActiveMQMessageHandler handler : this.handlers) {
            XAResource xares = handler.getXAResource();
            if (xares == null) continue;
            xaresources.add(xares);
        }
        return xaresources;
    }

    public void stop() {
        logger.trace("stop()");
        this.deliveryActive.set(false);
        this.teardown(true);
    }

    protected synchronized void setup() throws Exception {
        logger.debug("Setting up {}", (Object)this.spec);
        this.setupCF();
        this.setupDestination();
        Exception firstException = null;
        ClientSessionFactory cf = null;
        for (int i = 0; i < this.spec.getMaxSession(); ++i) {
            if (!this.spec.isSingleConnection().booleanValue()) {
                cf = null;
            }
            ClientSession session = null;
            try {
                if (cf == null) {
                    cf = this.factory.getServerLocator().createSessionFactory();
                }
                session = this.setupSession(cf);
                ActiveMQMessageHandler handler = new ActiveMQMessageHandler((ConnectionFactoryOptions)this.factory, this, this.ra.getTM(), (ClientSessionInternal)session, cf, i);
                handler.setup();
                this.handlers.add(handler);
                continue;
            }
            catch (Exception e) {
                if (cf != null && !this.spec.isSingleConnection().booleanValue()) {
                    cf.close();
                }
                if (session != null) {
                    session.close();
                }
                if (firstException != null) continue;
                firstException = e;
            }
        }
        if (firstException != null) {
            for (ActiveMQMessageHandler handler : this.handlers) {
                handler.teardown();
            }
            throw firstException;
        }
        for (ActiveMQMessageHandler handler : this.handlers) {
            handler.start();
        }
        HashMap<String, String> recoveryConfProps = new HashMap<String, String>();
        recoveryConfProps.put("JNDI_NAME", this.ra.getJndiName());
        this.resourceRecovery = this.ra.getRecoveryManager().register(this.factory, this.spec.getUser(), this.spec.getPassword(), recoveryConfProps);
        if (this.spec.isRebalanceConnections().booleanValue()) {
            this.factory.getServerLocator().addClusterTopologyListener((ClusterTopologyListener)new RebalancingListener());
        }
        logger.debug("Setup complete {}", (Object)this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void teardown(boolean useInterrupt) {
        Object object = this.teardownLock;
        synchronized (object) {
            boolean stuckThreads;
            long timeout;
            logger.debug("Tearing down {}", (Object)this.spec);
            long l = timeout = this.factory == null ? 30000L : this.factory.getCallTimeout();
            if (this.resourceRecovery != null) {
                this.ra.getRecoveryManager().unRegister(this.resourceRecovery);
            }
            final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[this.handlers.size()];
            for (int i = 0; i < this.handlers.size(); ++i) {
                handlersCopy[i] = this.handlers.get(this.handlers.size() - i - 1);
            }
            this.handlers.clear();
            FutureLatch future = new FutureLatch(handlersCopy.length);
            for (ActiveMQMessageHandler handler : handlersCopy) {
                handler.interruptConsumer(future);
            }
            boolean bl = stuckThreads = !future.await(timeout);
            if (stuckThreads && useInterrupt) {
                for (ActiveMQMessageHandler handler : handlersCopy) {
                    Thread interruptThread = handler.getCurrentThread();
                    if (interruptThread == null) continue;
                    try {
                        logger.trace("Interrupting thread {}", (Object)interruptThread.getName());
                    }
                    catch (Throwable justLog) {
                        logger.warn(justLog.getMessage(), justLog);
                    }
                    try {
                        interruptThread.interrupt();
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                }
            }
            Runnable runTearDown = new Runnable(){

                @Override
                public void run() {
                    for (ActiveMQMessageHandler handler : handlersCopy) {
                        handler.teardown();
                    }
                }
            };
            Thread threadTearDown = ActiveMQActivation.startThread("TearDown/HornetQActivation", runTearDown);
            try {
                threadTearDown.join(timeout);
            }
            catch (InterruptedException handler) {
                // empty catch block
            }
            if (this.factory != null) {
                try {
                    this.factory.close();
                }
                catch (Throwable e) {
                    ActiveMQRALogger.LOGGER.unableToCloseFactory(e);
                }
                this.factory = null;
            }
            if (threadTearDown.isAlive()) {
                threadTearDown.interrupt();
                try {
                    threadTearDown.join(5000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                if (threadTearDown.isAlive()) {
                    ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
                }
            }
            this.nodes.clear();
            this.lastReceived = false;
            logger.debug("Tearing down complete {}", (Object)this);
        }
    }

    protected void setupCF() throws Exception {
        if (this.spec.getConnectionFactoryLookup() != null) {
            InitialContext ctx = this.spec.getParsedJndiParams() == null ? new InitialContext() : new InitialContext(this.spec.getParsedJndiParams());
            Object fac = ctx.lookup(this.spec.getConnectionFactoryLookup());
            if (fac instanceof ActiveMQConnectionFactory) {
                this.factory = ActiveMQJMSClient.createConnectionFactory((String)((ActiveMQConnectionFactory)fac).toURI().toString(), (String)"internalConnection");
                this.factory.setEnableSharedClientID(true);
                this.factory.setEnable1xPrefixes(((ActiveMQConnectionFactory)fac).isEnable1xPrefixes());
            } else {
                this.factory = this.ra.newConnectionFactory(this.spec);
            }
        } else {
            this.factory = this.ra.newConnectionFactory(this.spec);
        }
    }

    protected ClientSession setupSession(ClientSessionFactory cf) throws Exception {
        ClientSession result = null;
        try {
            String clientID;
            result = this.ra.createSession(cf, this.spec.getAcknowledgeModeInt(), this.spec.getUser(), this.spec.getPassword(), this.ra.getPreAcknowledge(), this.ra.getDupsOKBatchSize(), this.ra.getTransactionBatchSize(), this.isDeliveryTransacted, this.spec.isUseLocalTx(), this.spec.getTransactionTimeout());
            result.addMetaData("resource-adapter", "inbound");
            result.addMetaData("jms-session", "");
            String string = clientID = this.ra.getClientID() == null ? this.spec.getClientID() : this.ra.getClientID();
            if (clientID != null) {
                result.addMetaData("jms-client-id", clientID);
            }
            logger.debug("Using queue connection {}", (Object)result);
            return result;
        }
        catch (Throwable t) {
            try {
                if (result != null) {
                    result.close();
                }
            }
            catch (Exception e) {
                logger.trace("Ignored error closing connection", (Throwable)e);
            }
            if (t instanceof Exception) {
                throw (Exception)t;
            }
            throw new RuntimeException("Error configuring connection", t);
        }
    }

    public SimpleString getAddress() {
        return this.destination.getSimpleAddress();
    }

    protected void setupDestination() throws Exception {
        block16: {
            String destinationName = this.spec.getDestination();
            if (this.spec.isUseJNDI().booleanValue()) {
                InitialContext ctx = this.spec.getParsedJndiParams() == null ? new InitialContext() : new InitialContext(this.spec.getParsedJndiParams());
                logger.debug("Using context {} for {}", ctx.getEnvironment(), (Object)this.spec);
                logger.trace("setupDestination({})", (Object)ctx);
                String destinationTypeString = this.spec.getDestinationType();
                if (destinationTypeString != null && !destinationTypeString.trim().equals("")) {
                    Class<Topic> destinationType;
                    logger.debug("Destination type defined as {}", (Object)destinationTypeString);
                    if (Topic.class.getName().equals(destinationTypeString)) {
                        destinationType = Topic.class;
                        this.isTopic = true;
                    } else {
                        destinationType = Queue.class;
                    }
                    logger.debug("Retrieving {} \"{}\" from JNDI", (Object)destinationType.getName(), (Object)destinationName);
                    try {
                        this.destination = (ActiveMQDestination)ActiveMQRaUtils.lookup(ctx, destinationName, destinationType);
                    }
                    catch (Exception e) {
                        if (destinationName == null) {
                            throw ActiveMQRABundle.BUNDLE.noDestinationName();
                        }
                        String calculatedDestinationName = destinationName.substring(destinationName.lastIndexOf(47) + 1);
                        if (this.isTopic) {
                            calculatedDestinationName = this.getTopicWithPrefix(calculatedDestinationName);
                        } else if (!this.isTopic) {
                            calculatedDestinationName = this.getQueueWithPrefix(calculatedDestinationName);
                        }
                        ActiveMQRALogger.LOGGER.unableToRetrieveDestinationName(destinationName, destinationType.getName(), calculatedDestinationName);
                        if (this.isTopic) {
                            this.destination = ActiveMQDestination.createTopic((String)calculatedDestinationName);
                            break block16;
                        }
                        this.destination = ActiveMQDestination.createQueue((String)calculatedDestinationName);
                    }
                } else {
                    logger.debug("Destination type not defined in MDB activation configuration.");
                    logger.debug("Retrieving {} \"{}\" from JNDI", (Object)Destination.class.getName(), (Object)destinationName);
                    this.destination = (ActiveMQDestination)ActiveMQRaUtils.lookup(ctx, destinationName, Destination.class);
                    if (this.destination instanceof Topic) {
                        this.isTopic = true;
                    }
                }
            } else {
                ActiveMQRALogger.LOGGER.instantiatingDestination(this.spec.getDestinationType(), this.spec.getDestination());
                if (Topic.class.getName().equals(this.spec.getDestinationType())) {
                    this.destination = ActiveMQDestination.createTopic((String)this.getTopicWithPrefix(this.spec.getDestination()), (String)this.spec.getDestination());
                    this.isTopic = true;
                } else {
                    this.destination = ActiveMQDestination.createQueue((String)this.getQueueWithPrefix(this.spec.getDestination()), (String)this.spec.getDestination());
                }
            }
        }
    }

    private String getTopicWithPrefix(String topic) {
        if (this.spec.getTopicPrefix() == null) {
            if (this.spec.isEnable1xPrefixes() == null) {
                if (this.ra.isEnable1xPrefixes() != null && this.ra.isEnable1xPrefixes().booleanValue() && !topic.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
                    return PacketImpl.OLD_TOPIC_PREFIX.toString() + topic;
                }
                return topic;
            }
            if (this.spec.isEnable1xPrefixes().booleanValue() && !topic.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
                return PacketImpl.OLD_TOPIC_PREFIX.toString() + topic;
            }
            return topic;
        }
        return this.spec.getTopicPrefix() + topic;
    }

    private String getQueueWithPrefix(String queue) {
        if (this.spec.getQueuePrefix() == null) {
            if (this.spec.isEnable1xPrefixes() == null) {
                if (this.ra.isEnable1xPrefixes() != null && this.ra.isEnable1xPrefixes().booleanValue() && !queue.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
                    return PacketImpl.OLD_QUEUE_PREFIX.toString() + queue;
                }
                return queue;
            }
            if (this.spec.isEnable1xPrefixes().booleanValue() && !queue.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
                return PacketImpl.OLD_QUEUE_PREFIX.toString() + queue;
            }
            return queue;
        }
        return this.spec.getQueuePrefix() + queue;
    }

    public String toString() {
        StringBuilder buffer = new StringBuilder();
        buffer.append(ActiveMQActivation.class.getName()).append('(');
        buffer.append("spec=").append(this.spec.getClass().getName());
        buffer.append(" mepf=").append(this.endpointFactory.getClass().getName());
        buffer.append(" active=").append(this.deliveryActive.get());
        if (this.spec.getDestination() != null) {
            buffer.append(" destination=").append(this.spec.getDestination());
        }
        buffer.append(" transacted=").append(this.isDeliveryTransacted);
        buffer.append(')');
        return buffer.toString();
    }

    public void startReconnectThread(String cause) {
        logger.trace("Starting reconnect Thread {} on MDB activation {}", (Object)cause, (Object)this);
        try {
            this.scheduleWork(new ReconnectWork(cause));
        }
        catch (Exception e) {
            logger.warn("Could not reconnect because worker is down", (Throwable)e);
        }
    }

    private static Thread startThread(String name, Runnable run) {
        ClassLoader tccl;
        try {
            tccl = AccessController.doPrivileged(new PrivilegedExceptionAction<ClassLoader>(){

                @Override
                public ClassLoader run() {
                    return ActiveMQActivation.class.getClassLoader();
                }
            });
        }
        catch (Throwable e) {
            logger.warn(e.getMessage(), e);
            tccl = null;
        }
        ActiveMQThreadFactory factory = new ActiveMQThreadFactory(name, true, tccl);
        Thread t = factory.newThread(run);
        t.start();
        return t;
    }

    private void scheduleWork(Work run) throws WorkException {
        this.ra.getWorkManager().scheduleWork(run);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reconnect(Throwable failure, boolean useInterrupt) {
        logger.trace("reconnecting activation {}", (Object)this);
        if (failure != null) {
            if (failure instanceof ActiveMQException && ((ActiveMQException)failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) {
                ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(this.getActivationSpec().getDestination());
            } else if (failure instanceof ActiveMQException && ((ActiveMQException)failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) {
                ActiveMQRALogger.LOGGER.awaitingJMSServerCreation();
            } else {
                ActiveMQRALogger.LOGGER.failureInActivation(this.spec, failure);
            }
        }
        int setupAttempts = this.spec.getSetupAttempts();
        long setupInterval = this.spec.getSetupInterval();
        if (this.inReconnect.getAndSet(true)) {
            return;
        }
        try {
            Throwable lastException = failure;
            for (int reconnectCount = 0; this.deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts); ++reconnectCount) {
                this.teardown(useInterrupt);
                try {
                    Thread.sleep(setupInterval);
                }
                catch (InterruptedException e) {
                    logger.debug("Interrupted trying to reconnect {}", (Object)this.spec, (Object)e);
                    break;
                }
                if (reconnectCount < 1) {
                    ActiveMQRALogger.LOGGER.attemptingReconnect(this.spec);
                }
                try {
                    this.setup();
                    ActiveMQRALogger.LOGGER.reconnected();
                    break;
                }
                catch (Throwable t) {
                    if (failure instanceof ActiveMQException && ((ActiveMQException)failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) {
                        if (lastException != null && t instanceof ActiveMQNonExistentQueueException) continue;
                        lastException = t;
                        ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(this.getActivationSpec().getDestination());
                        continue;
                    }
                    if (failure instanceof ActiveMQException && ((ActiveMQException)failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) {
                        if (lastException != null && t instanceof ActiveMQNotConnectedException) continue;
                        lastException = t;
                        ActiveMQRALogger.LOGGER.awaitingJMSServerCreation();
                        continue;
                    }
                    ActiveMQRALogger.LOGGER.errorReconnecting(this.spec, t);
                    continue;
                }
            }
        }
        finally {
            this.inReconnect.set(false);
        }
    }

    public ActiveMQConnectionFactory getConnectionFactory() {
        return this.factory;
    }

    static {
        try {
            ONMESSAGE = MessageListener.class.getMethod("onMessage", Message.class);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private class RebalancingListener
    implements ClusterTopologyListener {
        private RebalancingListener() {
        }

        public void nodeUP(TopologyMember member, boolean last) {
            boolean newNode = false;
            String id = member.getNodeId();
            if (!ActiveMQActivation.this.nodes.contains(id) && (ActiveMQActivation.this.removedNodes.get(id) == null || ActiveMQActivation.this.removedNodes.get(id) != null && ActiveMQActivation.this.removedNodes.get(id) < member.getUniqueEventID())) {
                ActiveMQActivation.this.nodes.add(id);
                newNode = true;
            }
            if (ActiveMQActivation.this.lastReceived && newNode) {
                ActiveMQRALogger.LOGGER.rebalancingConnections("nodeUp " + member.toString());
                ActiveMQActivation.this.startReconnectThread("NodeUP Connection Rebalancer");
            } else if (last) {
                ActiveMQActivation.this.lastReceived = true;
            }
        }

        public void nodeDown(long eventUID, String nodeID) {
            if (ActiveMQActivation.this.nodes.remove(nodeID)) {
                ActiveMQActivation.this.removedNodes.put(nodeID, eventUID);
                ActiveMQRALogger.LOGGER.rebalancingConnections("nodeDown " + nodeID);
                ActiveMQActivation.this.startReconnectThread("NodeDOWN Connection Rebalancer");
            }
        }
    }

    private class ReconnectWork
    implements Work {
        final String cause;

        ReconnectWork(String cause) {
            this.cause = cause;
        }

        public void release() {
        }

        public void run() {
            logger.trace("Starting reconnect for {}", (Object)this.cause);
            ActiveMQActivation.this.reconnect(null, false);
        }
    }

    private class SetupActivation
    implements Work {
        private SetupActivation() {
        }

        public void run() {
            try {
                ActiveMQActivation.this.setup();
            }
            catch (Throwable t) {
                ActiveMQActivation.this.reconnect(t, false);
            }
        }

        public void release() {
        }
    }
}

