package io.smallrye.reactive.messaging.jms;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.jms.i18n.JmsExceptions;
import io.smallrye.reactive.messaging.jms.i18n.JmsLogging;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;
import jakarta.inject.Inject;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSContext;
import java.lang.annotation.Annotation;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(JmsConnector.CONNECTOR_NAME)
@ConnectorAttributes({@ConnectorAttribute(name = "connection-factory-name", description = "The name of the JMS connection factory  (`jakarta.jms.ConnectionFactory`) to be used. If not set, it uses any exposed JMS connection factory", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type = "String"), @ConnectorAttribute(name = "username", description = "The username to connect to to the JMS server", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type = "String"), @ConnectorAttribute(name = "password", description = "The password to connect to to the JMS server", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type = "String"), @ConnectorAttribute(name = "session-mode", description = "The session mode. Accepted values are AUTO_ACKNOWLEDGE, SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type = "String", defaultValue = "AUTO_ACKNOWLEDGE"), @ConnectorAttribute(name = "client-id", description = "The client id", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type = "String"), @ConnectorAttribute(name = "destination", description = "The name of the JMS destination. If not set the name of the channel is used", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type = "String"), @ConnectorAttribute(name = "selector", description = "The JMS selector", direction = ConnectorAttribute.Direction.INCOMING, type = "String"), @ConnectorAttribute(name = "no-local", description = "Enable or disable local delivery", direction = ConnectorAttribute.Direction.INCOMING, type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "broadcast", description = "Whether or not the JMS message should be dispatched to multiple consumers", direction = ConnectorAttribute.Direction.INCOMING, type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "durable", description = "Set to `true` to use a durable subscription", direction = ConnectorAttribute.Direction.INCOMING, type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "destination-type", description = "The type of destination. It can be either `queue` or `topic`", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type = "string", defaultValue = "queue"), @ConnectorAttribute(name = "disable-message-id", description = "Omit the message id in the outbound JMS message", direction = ConnectorAttribute.Direction.OUTGOING, type = "boolean"), @ConnectorAttribute(name = "disable-message-timestamp", description = "Omit the message timestamp in the outbound JMS message", direction = ConnectorAttribute.Direction.OUTGOING, type = "boolean"), @ConnectorAttribute(name = "delivery-mode", description = "The delivery mode. Either `persistent` or `non_persistent`", direction = ConnectorAttribute.Direction.OUTGOING, type = "string"), @ConnectorAttribute(name = "delivery-delay", description = "The delivery delay", direction = ConnectorAttribute.Direction.OUTGOING, type = "long"), @ConnectorAttribute(name = "ttl", description = "The JMS Message time-to-live", direction = ConnectorAttribute.Direction.OUTGOING, type = "long"), @ConnectorAttribute(name = "correlation-id", description = "The JMS Message correlation id", direction = ConnectorAttribute.Direction.OUTGOING, type = "string"), @ConnectorAttribute(name = "priority", description = "The JMS Message priority", direction = ConnectorAttribute.Direction.OUTGOING, type = "int"), @ConnectorAttribute(name = "reply-to", description = "The reply to destination if any", direction = ConnectorAttribute.Direction.OUTGOING, type = "string"), @ConnectorAttribute(name = "reply-to-destination-type", description = "The type of destination for the response. It can be either `queue` or `topic`", direction = ConnectorAttribute.Direction.OUTGOING, type = "string", defaultValue = "queue"), @ConnectorAttribute(name = "merge", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "retry", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether to retry on terminal stream errors.", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "retry.max-retries", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Maximum number of retries for terminal stream errors.", type = "int", defaultValue = "3"), @ConnectorAttribute(name = "retry.initial-delay", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The initial delay for the retry.", type = "string", defaultValue = "PT1S"), @ConnectorAttribute(name = "retry.max-delay", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The maximum delay", type = "string", defaultValue = "PT10S"), @ConnectorAttribute(name = "retry.jitter", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "How much the delay jitters as a multiplier between 0 and 1. The formula is current delay * jitter. For example, with a current delay of 2H, a jitter of 0.5 will result in an actual delay somewhere between 1H and 3H.", type = "double", defaultValue = "0.5")})
/* loaded from: input_file:io/smallrye/reactive/messaging/jms/JmsConnector.class */
public class JmsConnector implements InboundConnector, OutboundConnector {
    public static final String CONNECTOR_NAME = "smallrye-jms";
    static final String DEFAULT_MAX_POOL_SIZE = "10";
    static final String DEFAULT_THREAD_TTL = "60";

    @Inject
    @Any
    Instance<ConnectionFactory> factories;

    @Inject
    Instance<JsonMapping> jsonMapper;

    @Inject
    @ConfigProperty(name = "smallrye.jms.threads.max-pool-size", defaultValue = DEFAULT_MAX_POOL_SIZE)
    int maxPoolSize;

    @Inject
    @ConfigProperty(name = "smallrye.jms.threads.ttl", defaultValue = DEFAULT_THREAD_TTL)
    int ttl;
    private ExecutorService executor;
    private JsonMapping jsonMapping;
    private final List<JmsSource> sources = new CopyOnWriteArrayList();
    private final List<JmsResourceHolder<?>> contexts = new CopyOnWriteArrayList();

    @PostConstruct
    public void init() {
        this.executor = Executors.newFixedThreadPool(this.maxPoolSize);
        if (this.jsonMapper.isUnsatisfied()) {
            JmsLogging.log.warn("Please add one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages.");
        } else if (!this.jsonMapper.isAmbiguous()) {
            this.jsonMapping = (JsonMapping) this.jsonMapper.get();
        } else {
            JmsLogging.log.warn("Please select only one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages.");
            this.jsonMapping = (JsonMapping) this.jsonMapper.stream().findFirst().orElseThrow(() -> {
                return new RuntimeException("Unable to find JSON Mapper");
            });
        }
    }

    @PreDestroy
    public void cleanup() {
        this.sources.forEach((v0) -> {
            v0.close();
        });
        this.contexts.forEach((v0) -> {
            v0.close();
        });
        this.executor.shutdown();
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        JmsConnectorIncomingConfiguration jmsConnectorIncomingConfiguration = new JmsConnectorIncomingConfiguration(config);
        JmsResourceHolder<?> jmsResourceHolder = new JmsResourceHolder<>(jmsConnectorIncomingConfiguration.getChannel(), () -> {
            return createJmsContext(jmsConnectorIncomingConfiguration);
        });
        this.contexts.add(jmsResourceHolder);
        JmsSource jmsSource = new JmsSource(jmsResourceHolder, jmsConnectorIncomingConfiguration, this.jsonMapping, this.executor);
        this.sources.add(jmsSource);
        return jmsSource.getSource();
    }

    private JMSContext createJmsContext(JmsConnectorCommonConfiguration jmsConnectorCommonConfiguration) {
        JMSContext createContext = createContext(pickTheFactory(jmsConnectorCommonConfiguration.getConnectionFactoryName().orElse(null)), jmsConnectorCommonConfiguration.getUsername().orElse(null), jmsConnectorCommonConfiguration.getPassword().orElse(null), jmsConnectorCommonConfiguration.getSessionMode());
        Optional<String> clientId = jmsConnectorCommonConfiguration.getClientId();
        Objects.requireNonNull(createContext);
        clientId.ifPresent(createContext::setClientID);
        return createContext;
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        JmsConnectorOutgoingConfiguration jmsConnectorOutgoingConfiguration = new JmsConnectorOutgoingConfiguration(config);
        JmsResourceHolder<?> jmsResourceHolder = new JmsResourceHolder<>(jmsConnectorOutgoingConfiguration.getChannel(), () -> {
            return createJmsContext(jmsConnectorOutgoingConfiguration);
        });
        this.contexts.add(jmsResourceHolder);
        return new JmsSink(jmsResourceHolder, jmsConnectorOutgoingConfiguration, this.jsonMapping, this.executor).getSink();
    }

    private ConnectionFactory pickTheFactory(String str) {
        Iterator it;
        if (this.factories.isUnsatisfied()) {
            if (str == null) {
                throw JmsExceptions.ex.illegalStateCannotFindFactory();
            }
            throw JmsExceptions.ex.illegalStateCannotFindNamedFactory(str);
        }
        if (str == null) {
            it = this.factories.iterator();
        } else {
            Instance select = this.factories.select(new Annotation[]{Identifier.Literal.of(str)});
            if (select.isUnsatisfied()) {
                select = this.factories.select(new Annotation[]{NamedLiteral.of(str)});
                if (!select.isUnsatisfied()) {
                    ProviderLogging.log.deprecatedNamed();
                }
            }
            it = select.iterator();
        }
        if (it.hasNext()) {
            return (ConnectionFactory) it.next();
        }
        if (str == null) {
            throw JmsExceptions.ex.illegalStateCannotFindFactory();
        }
        throw JmsExceptions.ex.illegalStateCannotFindNamedFactory(str);
    }

    private JMSContext createContext(ConnectionFactory connectionFactory, String str, String str2, String str3) {
        int i;
        String upperCase = str3.toUpperCase();
        boolean z = -1;
        switch (upperCase.hashCode()) {
            case -1634114552:
                if (upperCase.equals("CLIENT_ACKNOWLEDGE")) {
                    z = 2;
                    break;
                }
                break;
            case -1006453276:
                if (upperCase.equals("DUPS_OK_ACKNOWLEDGE")) {
                    z = 3;
                    break;
                }
                break;
            case 249351282:
                if (upperCase.equals("SESSION_TRANSACTED")) {
                    z = true;
                    break;
                }
                break;
            case 1841863660:
                if (upperCase.equals("AUTO_ACKNOWLEDGE")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                i = 1;
                break;
            case true:
                i = 0;
                break;
            case true:
                i = 2;
                break;
            case true:
                i = 3;
                break;
            default:
                throw JmsExceptions.ex.illegalStateUnknowSessionMode(str3);
        }
        return str != null ? connectionFactory.createContext(str, str2, i) : connectionFactory.createContext(i);
    }
}
