package io.smallrye.reactive.messaging.amqp;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connectors.ExecutionHolder;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.impl.AmqpMessageBuilderImpl;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.amqp.AmqpReceiver;
import io.vertx.mutiny.amqp.AmqpSender;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import java.lang.annotation.Annotation;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.event.Reception;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Connector(AmqpConnector.CONNECTOR_NAME)
@ConnectorAttributes({@ConnectorAttribute(name = "username", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The username used to authenticate to the broker", type = "string", alias = "amqp-username"), @ConnectorAttribute(name = "password", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The password used to authenticate to the broker", type = "string", alias = "amqp-password"), @ConnectorAttribute(name = "host", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The broker hostname", type = "string", alias = "amqp-host", defaultValue = "localhost"), @ConnectorAttribute(name = "port", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The broker port", type = "int", alias = "amqp-port", defaultValue = "5672"), @ConnectorAttribute(name = "use-ssl", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether the AMQP connection uses SSL/TLS", type = "boolean", alias = "amqp-use-ssl", defaultValue = "false"), @ConnectorAttribute(name = "reconnect-attempts", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The number of reconnection attempts", type = "int", alias = "amqp-reconnect-attempts", defaultValue = "100"), @ConnectorAttribute(name = "reconnect-interval", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The interval in second between two reconnection attempts", type = "int", alias = "amqp-reconnect-interval", defaultValue = "10"), @ConnectorAttribute(name = "connect-timeout", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The connection timeout in milliseconds", type = "int", alias = "amqp-connect-timeout", defaultValue = "1000"), @ConnectorAttribute(name = "containerId", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The AMQP container id", type = "string"), @ConnectorAttribute(name = "address", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The AMQP address. If not set, the channel name is used", type = "string"), @ConnectorAttribute(name = "client-options-name", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of the AMQP Client Option bean used to customize the AMQP client configuration", type = "string", alias = "amqp-client-options-name"), @ConnectorAttribute(name = "broadcast", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the received AMQP messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "durable", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether AMQP subscription is durable", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "auto-acknowledgement", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the received AMQP messages must be acknowledged when received", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "durable", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether sent AMQP messages are marked durable", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "ttl", direction = ConnectorAttribute.Direction.OUTGOING, description = "The time-to-live of the send AMQP messages. 0 to disable the TTL", type = "long", defaultValue = "0")})
/* loaded from: input_file:io/smallrye/reactive/messaging/amqp/AmqpConnector.class */
public class AmqpConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConnector.class);
    static final String CONNECTOR_NAME = "smallrye-amqp";
    private static final String JSON_CONTENT_TYPE = "application/json";

    @Inject
    private ExecutionHolder executionHolder;

    @Inject
    private Instance<AmqpClientOptions> clientOptions;
    private final List<AmqpClient> clients = new CopyOnWriteArrayList();
    private Vertx vertx = null;

    @PostConstruct
    void init() {
        this.vertx = this.executionHolder.vertx();
    }

    void setup(ExecutionHolder executionHolder) {
        this.executionHolder = executionHolder;
    }

    AmqpConnector() {
    }

    private AmqpClient createClient(AmqpConnectorCommonConfiguration amqpConnectorCommonConfiguration) {
        Optional<String> clientOptionsName = amqpConnectorCommonConfiguration.getClientOptionsName();
        AmqpClient createClientFromClientOptionsBean = clientOptionsName.isPresent() ? createClientFromClientOptionsBean(clientOptionsName.get()) : getClient(amqpConnectorCommonConfiguration);
        this.clients.add(createClientFromClientOptionsBean);
        return createClientFromClientOptionsBean;
    }

    private AmqpClient createClientFromClientOptionsBean(String str) {
        Instance select = this.clientOptions.select(new Annotation[]{NamedLiteral.of(str)});
        if (select.isUnsatisfied()) {
            throw new IllegalStateException("Cannot find a " + AmqpClientOptions.class.getName() + " bean named " + str);
        }
        LOGGER.debug("Creating AMQP client from bean named '{}'", str);
        return AmqpClient.create(new Vertx(this.vertx.getDelegate()), (AmqpClientOptions) select.get());
    }

    private synchronized AmqpClient getClient(AmqpConnectorCommonConfiguration amqpConnectorCommonConfiguration) {
        try {
            String orElse = amqpConnectorCommonConfiguration.getUsername().orElse(null);
            String orElse2 = amqpConnectorCommonConfiguration.getPassword().orElse(null);
            String host = amqpConnectorCommonConfiguration.getHost();
            int intValue = amqpConnectorCommonConfiguration.getPort().intValue();
            LOGGER.info("AMQP broker configured to {}:{} for channel {}", new Object[]{host, Integer.valueOf(intValue), amqpConnectorCommonConfiguration.getChannel()});
            return AmqpClient.create(new Vertx(this.vertx.getDelegate()), new AmqpClientOptions().setUsername(orElse).setPassword(orElse2).setHost(host).setPort(intValue).setContainerId(amqpConnectorCommonConfiguration.getContainerId().orElse(null)).setSsl(amqpConnectorCommonConfiguration.getUseSsl().booleanValue()).setReconnectAttempts(amqpConnectorCommonConfiguration.getReconnectAttempts().intValue()).setReconnectInterval(amqpConnectorCommonConfiguration.getReconnectInterval().intValue()).setConnectTimeout(amqpConnectorCommonConfiguration.getConnectTimeout().intValue()));
        } catch (Exception e) {
            LOGGER.error("Unable to create client", e);
            throw new IllegalStateException("Unable to create a client, probably a config error", e);
        }
    }

    private Multi<? extends Message<?>> getStreamOfMessages(AmqpReceiver amqpReceiver) {
        return Multi.createFrom().deferred(() -> {
            return amqpReceiver.toMulti().map(AmqpMessage::new);
        });
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        AmqpConnectorIncomingConfiguration amqpConnectorIncomingConfiguration = new AmqpConnectorIncomingConfiguration(config);
        Optional<String> address = amqpConnectorIncomingConfiguration.getAddress();
        amqpConnectorIncomingConfiguration.getClass();
        String orElseGet = address.orElseGet(amqpConnectorIncomingConfiguration::getChannel);
        boolean booleanValue = amqpConnectorIncomingConfiguration.getBroadcast().booleanValue();
        boolean booleanValue2 = amqpConnectorIncomingConfiguration.getDurable().booleanValue();
        boolean booleanValue3 = amqpConnectorIncomingConfiguration.getAutoAcknowledgement().booleanValue();
        PublisherBuilder<? extends Message<?>> flatMapRsPublisher = ReactiveStreams.fromCompletionStage(createClient(amqpConnectorIncomingConfiguration).connect().flatMap(amqpConnection -> {
            return amqpConnection.createReceiver(orElseGet, new AmqpReceiverOptions().setAutoAcknowledgement(booleanValue3).setDurable(booleanValue2));
        }).subscribeAsCompletionStage()).flatMapRsPublisher(this::getStreamOfMessages);
        return booleanValue ? ReactiveStreams.fromPublisher(Multi.createFrom().publisher(flatMapRsPublisher.buildRs()).broadcast().toAllSubscribers()) : flatMapRsPublisher;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        AmqpConnectorOutgoingConfiguration amqpConnectorOutgoingConfiguration = new AmqpConnectorOutgoingConfiguration(config);
        Optional<String> address = amqpConnectorOutgoingConfiguration.getAddress();
        amqpConnectorOutgoingConfiguration.getClass();
        String orElseGet = address.orElseGet(amqpConnectorOutgoingConfiguration::getChannel);
        boolean booleanValue = amqpConnectorOutgoingConfiguration.getDurable().booleanValue();
        long longValue = amqpConnectorOutgoingConfiguration.getTtl().longValue();
        AtomicReference atomicReference = new AtomicReference();
        AmqpClient createClient = createClient(amqpConnectorOutgoingConfiguration);
        return ReactiveStreams.builder().flatMapCompletionStage(message -> {
            AmqpSender amqpSender = (AmqpSender) atomicReference.get();
            return amqpSender == null ? createClient.connect().flatMap((v0) -> {
                return v0.createAnonymousSender();
            }).map(amqpSender2 -> {
                atomicReference.set(amqpSender2);
                return amqpSender2;
            }).flatMap(amqpSender3 -> {
                return send(amqpSender3, message, booleanValue, longValue, orElseGet);
            }).onFailure().invoke(th -> {
                if (this.clients.isEmpty()) {
                    LOGGER.error("The AMQP message has not been sent, the client is closed");
                } else {
                    LOGGER.error("Unable to send the AMQP message", th);
                }
            }).subscribeAsCompletionStage() : send(amqpSender, message, booleanValue, longValue, orElseGet).subscribeAsCompletionStage();
        }).ignore();
    }

    private String getActualAddress(Message<?> message, io.vertx.mutiny.amqp.AmqpMessage amqpMessage, String str) {
        return amqpMessage.address() != null ? amqpMessage.address() : (String) message.getMetadata(OutgoingAmqpMetadata.class).flatMap(outgoingAmqpMetadata -> {
            return Optional.ofNullable(outgoingAmqpMetadata.getAddress());
        }).orElse(str);
    }

    private Uni<Message<?>> send(AmqpSender amqpSender, Message<?> message, boolean z, long j, String str) {
        io.vertx.mutiny.amqp.AmqpMessage amqpMessage = message instanceof AmqpMessage ? ((AmqpMessage) message).getAmqpMessage() : message.getPayload() instanceof io.vertx.mutiny.amqp.AmqpMessage ? (io.vertx.mutiny.amqp.AmqpMessage) message.getPayload() : message.getPayload() instanceof io.vertx.amqp.AmqpMessage ? new io.vertx.mutiny.amqp.AmqpMessage((io.vertx.amqp.AmqpMessage) message.getPayload()) : convertToAmqpMessage(message, z, j);
        String actualAddress = getActualAddress(message, amqpMessage, str);
        if (this.clients.isEmpty()) {
            LOGGER.error("The AMQP message to address `{}` has not been sent, the client is closed", actualAddress);
            return Uni.createFrom().item(message);
        }
        if (!actualAddress.equals(amqpMessage.address())) {
            amqpMessage = new io.vertx.mutiny.amqp.AmqpMessage(new AmqpMessageBuilderImpl(amqpMessage.getDelegate()).address(actualAddress).build());
        }
        LOGGER.debug("Sending AMQP message to address `{}` ", actualAddress);
        return amqpSender.sendWithAck(amqpMessage).onItem().produceCompletionStage(r3 -> {
            return message.ack();
        }).onItem().apply(r32 -> {
            return message;
        });
    }

    private io.vertx.mutiny.amqp.AmqpMessage convertToAmqpMessage(Message<?> message, boolean z, long j) {
        Object payload = message.getPayload();
        Optional metadata = message.getMetadata(OutgoingAmqpMetadata.class);
        io.vertx.mutiny.amqp.AmqpMessageBuilder create = io.vertx.mutiny.amqp.AmqpMessage.create();
        if (z) {
            create.durable(true);
        } else {
            create.durable(((Boolean) metadata.map((v0) -> {
                return v0.isDurable();
            }).orElse(false)).booleanValue());
        }
        if (j > 0) {
            create.ttl(j);
        } else {
            long longValue = ((Long) metadata.map((v0) -> {
                return v0.getTtl();
            }).orElse(-1L)).longValue();
            if (longValue > 0) {
                create.ttl(longValue);
            }
        }
        if (payload instanceof String) {
            create.withBody((String) payload);
        } else if (payload instanceof Boolean) {
            create.withBooleanAsBody(((Boolean) payload).booleanValue());
        } else if (payload instanceof Buffer) {
            create.withBufferAsBody((Buffer) payload);
        } else if (payload instanceof Byte) {
            create.withByteAsBody(((Byte) payload).byteValue());
        } else if (payload instanceof Character) {
            create.withCharAsBody(((Character) payload).charValue());
        } else if (payload instanceof Double) {
            create.withDoubleAsBody(((Double) payload).doubleValue());
        } else if (payload instanceof Float) {
            create.withFloatAsBody(((Float) payload).floatValue());
        } else if (payload instanceof Instant) {
            create.withInstantAsBody((Instant) payload);
        } else if (payload instanceof Integer) {
            create.withIntegerAsBody(((Integer) payload).intValue());
        } else if (payload instanceof JsonArray) {
            create.withJsonArrayAsBody((JsonArray) payload).contentType(JSON_CONTENT_TYPE);
        } else if (payload instanceof JsonObject) {
            create.withJsonObjectAsBody((JsonObject) payload).contentType(JSON_CONTENT_TYPE);
        } else if (payload instanceof Long) {
            create.withLongAsBody(((Long) payload).longValue());
        } else if (payload instanceof Short) {
            create.withShortAsBody(((Short) payload).shortValue());
        } else if (payload instanceof UUID) {
            create.withUuidAsBody((UUID) payload);
        } else if (payload instanceof byte[]) {
            create.withBufferAsBody(Buffer.buffer((byte[]) payload));
        } else {
            create.withBody(Json.encode(payload)).contentType(JSON_CONTENT_TYPE);
        }
        create.address((String) metadata.map((v0) -> {
            return v0.getAddress();
        }).orElse(null));
        create.applicationProperties((JsonObject) metadata.map((v0) -> {
            return v0.getProperties();
        }).orElseGet(JsonObject::new));
        create.contentEncoding((String) metadata.map((v0) -> {
            return v0.getContentEncoding();
        }).orElse(null));
        create.contentType((String) metadata.map((v0) -> {
            return v0.getContentType();
        }).orElse(null));
        create.correlationId((String) metadata.map((v0) -> {
            return v0.getCorrelationId();
        }).orElse(null));
        create.groupId((String) metadata.map((v0) -> {
            return v0.getGroupId();
        }).orElse(null));
        create.id((String) metadata.map((v0) -> {
            return v0.getId();
        }).orElse(null));
        int intValue = ((Integer) metadata.map((v0) -> {
            return v0.getPriority();
        }).orElse(-1)).intValue();
        if (intValue >= 0) {
            create.priority((short) intValue);
        }
        create.subject((String) metadata.map((v0) -> {
            return v0.getSubject();
        }).orElse(null));
        return create.build();
    }

    public void terminate(@Observes(notifyObserver = Reception.IF_EXISTS) @BeforeDestroyed(ApplicationScoped.class) @Priority(50) Object obj) {
        this.clients.forEach(amqpClient -> {
            amqpClient.close().subscribeAsCompletionStage();
        });
        this.clients.clear();
    }
}
