/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.amqp;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.amqp.AmqpConnectorCommonConfiguration;
import io.smallrye.reactive.messaging.amqp.AmqpConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.amqp.AmqpConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.amqp.AmqpMessage;
import io.smallrye.reactive.messaging.amqp.OutgoingAmqpMetadata;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connectors.ExecutionHolder;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.impl.AmqpMessageBuilderImpl;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.amqp.AmqpConnection;
import io.vertx.mutiny.amqp.AmqpMessageBuilder;
import io.vertx.mutiny.amqp.AmqpReceiver;
import io.vertx.mutiny.amqp.AmqpSender;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import java.lang.annotation.Annotation;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.event.Reception;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Connector(value="smallrye-amqp")
@ConnectorAttributes(value={@ConnectorAttribute(name="username", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The username used to authenticate to the broker", type="string", alias="amqp-username"), @ConnectorAttribute(name="password", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The password used to authenticate to the broker", type="string", alias="amqp-password"), @ConnectorAttribute(name="host", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker hostname", type="string", alias="amqp-host", defaultValue="localhost"), @ConnectorAttribute(name="port", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The broker port", type="int", alias="amqp-port", defaultValue="5672"), @ConnectorAttribute(name="use-ssl", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="Whether the AMQP connection uses SSL/TLS", type="boolean", alias="amqp-use-ssl", defaultValue="false"), @ConnectorAttribute(name="reconnect-attempts", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The number of reconnection attempts", type="int", alias="amqp-reconnect-attempts", defaultValue="100"), @ConnectorAttribute(name="reconnect-interval", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The interval in second between two reconnection attempts", type="int", alias="amqp-reconnect-interval", defaultValue="10"), @ConnectorAttribute(name="connect-timeout", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The connection timeout in milliseconds", type="int", alias="amqp-connect-timeout", defaultValue="1000"), @ConnectorAttribute(name="containerId", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The AMQP container id", type="string"), @ConnectorAttribute(name="address", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The AMQP address. If not set, the channel name is used", type="string"), @ConnectorAttribute(name="client-options-name", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The name of the AMQP Client Option bean used to customize the AMQP client configuration", type="string", alias="amqp-client-options-name"), @ConnectorAttribute(name="broadcast", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received AMQP messages must be dispatched to multiple _subscribers_", type="boolean", defaultValue="false"), @ConnectorAttribute(name="durable", direction=ConnectorAttribute.Direction.INCOMING, description="Whether AMQP subscription is durable", type="boolean", defaultValue="true"), @ConnectorAttribute(name="auto-acknowledgement", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the received AMQP messages must be acknowledged when received", type="boolean", defaultValue="false"), @ConnectorAttribute(name="durable", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether sent AMQP messages are marked durable", type="boolean", defaultValue="true"), @ConnectorAttribute(name="ttl", direction=ConnectorAttribute.Direction.OUTGOING, description="The time-to-live of the send AMQP messages. 0 to disable the TTL", type="long", defaultValue="0")})
public class AmqpConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConnector.class);
    static final String CONNECTOR_NAME = "smallrye-amqp";
    private static final String JSON_CONTENT_TYPE = "application/json";
    @Inject
    private ExecutionHolder executionHolder;
    @Inject
    private Instance<AmqpClientOptions> clientOptions;
    private Vertx vertx = null;
    private final List<AmqpClient> clients = new CopyOnWriteArrayList<AmqpClient>();

    @PostConstruct
    void init() {
        this.vertx = this.executionHolder.vertx();
    }

    void setup(ExecutionHolder executionHolder) {
        this.executionHolder = executionHolder;
    }

    AmqpConnector() {
    }

    private AmqpClient createClient(AmqpConnectorCommonConfiguration config) {
        Optional<String> clientOptionsName = config.getClientOptionsName();
        AmqpClient client = clientOptionsName.isPresent() ? this.createClientFromClientOptionsBean(clientOptionsName.get()) : this.getClient(config);
        this.clients.add(client);
        return client;
    }

    private AmqpClient createClientFromClientOptionsBean(String optionsBeanName) {
        Instance options = this.clientOptions.select(new Annotation[]{NamedLiteral.of((String)optionsBeanName)});
        if (options.isUnsatisfied()) {
            throw new IllegalStateException("Cannot find a " + AmqpClientOptions.class.getName() + " bean named " + optionsBeanName);
        }
        LOGGER.debug("Creating AMQP client from bean named '{}'", (Object)optionsBeanName);
        return AmqpClient.create((Vertx)new Vertx(this.vertx.getDelegate()), (AmqpClientOptions)((AmqpClientOptions)options.get()));
    }

    private synchronized AmqpClient getClient(AmqpConnectorCommonConfiguration config) {
        try {
            String username = config.getUsername().orElse(null);
            String password = config.getPassword().orElse(null);
            String host = config.getHost();
            int port = config.getPort();
            LOGGER.info("AMQP broker configured to {}:{} for channel {}", new Object[]{host, port, config.getChannel()});
            boolean useSsl = config.getUseSsl();
            int reconnectAttempts = config.getReconnectAttempts();
            int reconnectInterval = config.getReconnectInterval();
            int connectTimeout = config.getConnectTimeout();
            String containerId = config.getContainerId().orElse(null);
            AmqpClientOptions options = new AmqpClientOptions().setUsername(username).setPassword(password).setHost(host).setPort(port).setContainerId(containerId).setSsl(useSsl).setReconnectAttempts(reconnectAttempts).setReconnectInterval((long)reconnectInterval).setConnectTimeout(connectTimeout);
            return AmqpClient.create((Vertx)new Vertx(this.vertx.getDelegate()), (AmqpClientOptions)options);
        }
        catch (Exception e) {
            LOGGER.error("Unable to create client", (Throwable)e);
            throw new IllegalStateException("Unable to create a client, probably a config error", e);
        }
    }

    private Multi<? extends Message<?>> getStreamOfMessages(AmqpReceiver receiver) {
        return Multi.createFrom().deferred(() -> receiver.toMulti().map(AmqpMessage::new));
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        AmqpConnectorIncomingConfiguration ic = new AmqpConnectorIncomingConfiguration(config);
        String address = ic.getAddress().orElseGet(ic::getChannel);
        boolean broadcast = ic.getBroadcast();
        boolean durable = ic.getDurable();
        boolean autoAck = ic.getAutoAcknowledgement();
        Uni uni = this.createClient(ic).connect().flatMap(connection -> connection.createReceiver(address, new AmqpReceiverOptions().setAutoAcknowledgement(autoAck).setDurable(durable)));
        PublisherBuilder builder = ReactiveStreams.fromCompletionStage((CompletionStage)uni.subscribeAsCompletionStage()).flatMapRsPublisher(this::getStreamOfMessages);
        if (broadcast) {
            return ReactiveStreams.fromPublisher((Publisher)Multi.createFrom().publisher(builder.buildRs()).broadcast().toAllSubscribers());
        }
        return builder;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        AmqpConnectorOutgoingConfiguration oc = new AmqpConnectorOutgoingConfiguration(config);
        String configuredAddress = oc.getAddress().orElseGet(oc::getChannel);
        boolean durable = oc.getDurable();
        long ttl = oc.getTtl();
        AtomicReference sender = new AtomicReference();
        AmqpClient client = this.createClient(oc);
        return ReactiveStreams.builder().flatMapCompletionStage(message -> {
            AmqpSender as = (AmqpSender)sender.get();
            if (as == null) {
                return client.connect().flatMap(AmqpConnection::createAnonymousSender).map(s -> {
                    sender.set(s);
                    return s;
                }).flatMap(s -> this.send((AmqpSender)s, (Message<?>)message, durable, ttl, configuredAddress)).onFailure().invoke(failure -> {
                    if (this.clients.isEmpty()) {
                        LOGGER.error("The AMQP message has not been sent, the client is closed");
                    } else {
                        LOGGER.error("Unable to send the AMQP message", failure);
                    }
                }).subscribeAsCompletionStage();
            }
            return this.send(as, (Message<?>)message, durable, ttl, configuredAddress).subscribeAsCompletionStage();
        }).ignore();
    }

    private String getActualAddress(Message<?> message, io.vertx.mutiny.amqp.AmqpMessage amqp, String configuredAddress) {
        if (amqp.address() != null) {
            return amqp.address();
        }
        return message.getMetadata(OutgoingAmqpMetadata.class).flatMap(o -> Optional.ofNullable(o.getAddress())).orElse(configuredAddress);
    }

    private Uni<Message<?>> send(AmqpSender sender, Message<?> msg, boolean durable, long ttl, String configuredAddress) {
        io.vertx.mutiny.amqp.AmqpMessage amqp = msg instanceof AmqpMessage ? ((AmqpMessage)msg).getAmqpMessage() : (msg.getPayload() instanceof io.vertx.mutiny.amqp.AmqpMessage ? (io.vertx.mutiny.amqp.AmqpMessage)msg.getPayload() : (msg.getPayload() instanceof io.vertx.amqp.AmqpMessage ? new io.vertx.mutiny.amqp.AmqpMessage((io.vertx.amqp.AmqpMessage)msg.getPayload()) : this.convertToAmqpMessage(msg, durable, ttl)));
        String actualAddress = this.getActualAddress(msg, amqp, configuredAddress);
        if (this.clients.isEmpty()) {
            LOGGER.error("The AMQP message to address `{}` has not been sent, the client is closed", (Object)actualAddress);
            return Uni.createFrom().item(msg);
        }
        if (!actualAddress.equals(amqp.address())) {
            amqp = new io.vertx.mutiny.amqp.AmqpMessage(new AmqpMessageBuilderImpl(amqp.getDelegate()).address(actualAddress).build());
        }
        LOGGER.debug("Sending AMQP message to address `{}` ", (Object)actualAddress);
        return sender.sendWithAck(amqp).onItem().produceCompletionStage(x -> msg.ack()).onItem().apply(x -> msg);
    }

    private io.vertx.mutiny.amqp.AmqpMessage convertToAmqpMessage(Message<?> message, boolean durable, long ttl) {
        Object payload = message.getPayload();
        Optional metadata = message.getMetadata(OutgoingAmqpMetadata.class);
        AmqpMessageBuilder builder = io.vertx.mutiny.amqp.AmqpMessage.create();
        if (durable) {
            builder.durable(true);
        } else {
            builder.durable(metadata.map(OutgoingAmqpMetadata::isDurable).orElse(false).booleanValue());
        }
        if (ttl > 0L) {
            builder.ttl(ttl);
        } else {
            long t = metadata.map(OutgoingAmqpMetadata::getTtl).orElse(-1L);
            if (t > 0L) {
                builder.ttl(t);
            }
        }
        if (payload instanceof String) {
            builder.withBody((String)payload);
        } else if (payload instanceof Boolean) {
            builder.withBooleanAsBody(((Boolean)payload).booleanValue());
        } else if (payload instanceof Buffer) {
            builder.withBufferAsBody((Buffer)payload);
        } else if (payload instanceof Byte) {
            builder.withByteAsBody(((Byte)payload).byteValue());
        } else if (payload instanceof Character) {
            builder.withCharAsBody(((Character)payload).charValue());
        } else if (payload instanceof Double) {
            builder.withDoubleAsBody(((Double)payload).doubleValue());
        } else if (payload instanceof Float) {
            builder.withFloatAsBody(((Float)payload).floatValue());
        } else if (payload instanceof Instant) {
            builder.withInstantAsBody((Instant)payload);
        } else if (payload instanceof Integer) {
            builder.withIntegerAsBody(((Integer)payload).intValue());
        } else if (payload instanceof JsonArray) {
            builder.withJsonArrayAsBody((JsonArray)payload).contentType(JSON_CONTENT_TYPE);
        } else if (payload instanceof JsonObject) {
            builder.withJsonObjectAsBody((JsonObject)payload).contentType(JSON_CONTENT_TYPE);
        } else if (payload instanceof Long) {
            builder.withLongAsBody(((Long)payload).longValue());
        } else if (payload instanceof Short) {
            builder.withShortAsBody(((Short)payload).shortValue());
        } else if (payload instanceof UUID) {
            builder.withUuidAsBody((UUID)payload);
        } else if (payload instanceof byte[]) {
            builder.withBufferAsBody(Buffer.buffer((byte[])((byte[])payload)));
        } else {
            builder.withBody(Json.encode((Object)payload)).contentType(JSON_CONTENT_TYPE);
        }
        builder.address((String)metadata.map(OutgoingAmqpMetadata::getAddress).orElse(null));
        builder.applicationProperties(metadata.map(OutgoingAmqpMetadata::getProperties).orElseGet(JsonObject::new));
        builder.contentEncoding((String)metadata.map(OutgoingAmqpMetadata::getContentEncoding).orElse(null));
        builder.contentType((String)metadata.map(OutgoingAmqpMetadata::getContentType).orElse(null));
        builder.correlationId((String)metadata.map(OutgoingAmqpMetadata::getCorrelationId).orElse(null));
        builder.groupId((String)metadata.map(OutgoingAmqpMetadata::getGroupId).orElse(null));
        builder.id((String)metadata.map(OutgoingAmqpMetadata::getId).orElse(null));
        int priority = metadata.map(OutgoingAmqpMetadata::getPriority).orElse(-1);
        if (priority >= 0) {
            builder.priority((short)priority);
        }
        builder.subject((String)metadata.map(OutgoingAmqpMetadata::getSubject).orElse(null));
        return builder.build();
    }

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=50) @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        this.clients.forEach(c -> c.close().subscribeAsCompletionStage());
        this.clients.clear();
    }
}

