package io.smallrye.reactive.messaging.amqp;

import io.reactivex.Flowable;
import io.reactivex.processors.MulticastProcessor;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.axle.amqp.AmqpClient;
import io.vertx.axle.amqp.AmqpMessageBuilder;
import io.vertx.axle.amqp.AmqpReceiver;
import io.vertx.axle.amqp.AmqpSender;
import io.vertx.axle.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Connector(AmqpConnector.CONNECTOR_NAME)
/* loaded from: input_file:io/smallrye/reactive/messaging/amqp/AmqpConnector.class */
public class AmqpConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConnector.class);
    static final String CONNECTOR_NAME = "smallrye-amqp";
    private AmqpClient client;

    @Inject
    private Instance<Vertx> instanceOfVertx;

    @Inject
    @ConfigProperty(name = "amqp-port", defaultValue = "5672")
    private Integer configuredPort;

    @Inject
    @ConfigProperty(name = "amqp-host", defaultValue = "localhost")
    private String configuredHost;

    @Inject
    @ConfigProperty(name = "amqp-username")
    private Optional<String> configuredUsername;

    @Inject
    @ConfigProperty(name = "amqp-password")
    private Optional<String> configuredPassword;

    @Inject
    @ConfigProperty(name = "amqp-use-ssl")
    private Optional<Boolean> configuredUseSsl;

    @Inject
    @ConfigProperty(name = "amqp-reconnect-attempts", defaultValue = "100")
    private Optional<Integer> configuredReconnectAttempts;

    @Inject
    @ConfigProperty(name = "amqp-reconnect-interval", defaultValue = "10")
    private Optional<Long> configuredReconnectInterval;

    @Inject
    @ConfigProperty(name = "amqp-connect-timeout", defaultValue = "1000")
    private Optional<Integer> configuredConnectTimeout;
    private boolean internalVertxInstance = false;
    private Vertx vertx = null;

    public void terminate(@Observes @BeforeDestroyed(ApplicationScoped.class) Object obj) {
        if (this.internalVertxInstance) {
            this.vertx.close();
        }
    }

    @PostConstruct
    void init() {
        if (this.instanceOfVertx != null && !this.instanceOfVertx.isUnsatisfied()) {
            this.vertx = (Vertx) this.instanceOfVertx.get();
        } else {
            this.internalVertxInstance = true;
            this.vertx = Vertx.vertx();
        }
    }

    AmqpConnector() {
    }

    private synchronized AmqpClient getClient(Config config) {
        if (this.client != null) {
            return this.client;
        }
        try {
            String str = (String) config.getOptionalValue("username", String.class).orElseGet(() -> {
                if (this.configuredUsername != null) {
                    return this.configuredUsername.orElse(null);
                }
                return null;
            });
            String str2 = (String) config.getOptionalValue("password", String.class).orElseGet(() -> {
                if (this.configuredPassword != null) {
                    return this.configuredPassword.orElse(null);
                }
                return null;
            });
            String str3 = (String) config.getOptionalValue("host", String.class).orElseGet(() -> {
                if (this.configuredHost != null) {
                    return this.configuredHost;
                }
                LOGGER.info("No AMQP host configured, using localhost");
                return "localhost";
            });
            int intValue = ((Integer) config.getOptionalValue("port", Integer.class).orElseGet(() -> {
                if (this.configuredPort == null) {
                    return 5672;
                }
                return this.configuredPort;
            })).intValue();
            boolean booleanValue = ((Boolean) config.getOptionalValue("use-ssl", Boolean.class).orElseGet(() -> {
                if (this.configuredUseSsl == null) {
                    return false;
                }
                return this.configuredUseSsl.orElse(Boolean.FALSE);
            })).booleanValue();
            int intValue2 = ((Integer) config.getOptionalValue("reconnect-attempts", Integer.class).orElseGet(() -> {
                if (this.configuredReconnectAttempts == null) {
                    return 100;
                }
                return this.configuredReconnectAttempts.get();
            })).intValue();
            long longValue = ((Long) config.getOptionalValue("reconnect-interval", Long.class).orElseGet(() -> {
                if (this.configuredReconnectInterval == null) {
                    return 10L;
                }
                return this.configuredReconnectInterval.get();
            })).longValue();
            this.client = AmqpClient.create(new io.vertx.axle.core.Vertx(this.vertx.getDelegate()), new AmqpClientOptions().setUsername(str).setPassword(str2).setHost(str3).setPort(intValue).setContainerId((String) config.getOptionalValue("containerId", String.class).orElse(null)).setSsl(booleanValue).setReconnectAttempts(intValue2).setReconnectInterval(longValue).setConnectTimeout(((Integer) config.getOptionalValue("connect-timeout", Integer.class).orElseGet(() -> {
                if (this.configuredConnectTimeout == null) {
                    return 1000;
                }
                return this.configuredConnectTimeout.get();
            })).intValue()));
            return this.client;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Flowable<? extends Message<?>> getStreamOfMessages(AmqpReceiver amqpReceiver) {
        return Flowable.defer(() -> {
            return Flowable.fromPublisher(amqpReceiver.toPublisher());
        }).map(amqpMessage -> {
            return new AmqpMessage(amqpMessage);
        });
    }

    private String getAddressOrFail(Config config) {
        return (String) config.getOptionalValue("address", String.class).orElseGet(() -> {
            return (String) config.getOptionalValue("channel-name", String.class).orElseThrow(() -> {
                return new IllegalArgumentException("Address must be set");
            });
        });
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        String addressOrFail = getAddressOrFail(config);
        boolean booleanValue = ((Boolean) config.getOptionalValue("broadcast", Boolean.class).orElse(false)).booleanValue();
        boolean booleanValue2 = ((Boolean) config.getOptionalValue("durable", Boolean.class).orElse(true)).booleanValue();
        boolean booleanValue3 = ((Boolean) config.getOptionalValue("auto-acknowledgement", Boolean.class).orElse(false)).booleanValue();
        PublisherBuilder<? extends Message<?>> flatMapRsPublisher = ReactiveStreams.fromCompletionStage(getClient(config).connect().thenCompose(amqpConnection -> {
            return amqpConnection.createReceiver(addressOrFail, new AmqpReceiverOptions().setAutoAcknowledgement(booleanValue3).setDurable(booleanValue2));
        })).flatMapRsPublisher(this::getStreamOfMessages);
        return booleanValue ? flatMapRsPublisher.via(MulticastProcessor.create()) : flatMapRsPublisher;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        String addressOrFail = getAddressOrFail(config);
        boolean booleanValue = ((Boolean) config.getOptionalValue("durable", Boolean.class).orElse(true)).booleanValue();
        long longValue = ((Long) config.getOptionalValue("ttl", Long.class).orElse(0L)).longValue();
        AtomicReference atomicReference = new AtomicReference();
        return ReactiveStreams.builder().flatMapCompletionStage(message -> {
            AmqpSender amqpSender = (AmqpSender) atomicReference.get();
            if (amqpSender != null) {
                return send(amqpSender, message, booleanValue, longValue);
            }
            try {
                this.client = getClient(config);
                return this.client.createSender(addressOrFail).thenApply(amqpSender2 -> {
                    atomicReference.set(amqpSender2);
                    return amqpSender2;
                }).thenCompose(amqpSender3 -> {
                    try {
                        return send(amqpSender3, message, booleanValue, longValue);
                    } catch (Exception e) {
                        LOGGER.error("Unable to send the message", e);
                        CompletableFuture completableFuture = new CompletableFuture();
                        completableFuture.completeExceptionally(e);
                        return completableFuture;
                    }
                }).whenComplete((obj, obj2) -> {
                    if (obj2 != null) {
                        LOGGER.error("Unable to send the AMQP message", obj2);
                    }
                });
            } catch (Exception e) {
                LOGGER.error("Unable to create client", e);
                throw new IllegalStateException("Unable to create a client, probably a config error", e);
            }
        }).ignore();
    }

    private CompletionStage send(AmqpSender amqpSender, Message message, boolean z, long j) {
        io.vertx.axle.amqp.AmqpMessage amqpMessage = message instanceof AmqpMessage ? ((AmqpMessage) message).getAmqpMessage() : message.getPayload() instanceof io.vertx.axle.amqp.AmqpMessage ? (io.vertx.axle.amqp.AmqpMessage) message.getPayload() : message.getPayload() instanceof io.vertx.amqp.AmqpMessage ? new io.vertx.axle.amqp.AmqpMessage((io.vertx.amqp.AmqpMessage) message.getPayload()) : convertToAmqpMessage(message.getPayload(), z, j);
        LOGGER.debug("Sending AMQP message to address `{}` ", amqpMessage.address() == null ? amqpSender.address() : amqpMessage.address());
        return amqpSender.sendWithAck(amqpMessage).thenCompose(r3 -> {
            return message.ack();
        }).thenApply(obj -> {
            return message;
        });
    }

    private io.vertx.axle.amqp.AmqpMessage convertToAmqpMessage(Object obj, boolean z, long j) {
        AmqpMessageBuilder create = io.vertx.axle.amqp.AmqpMessage.create();
        if (z) {
            create.durable(true);
        }
        if (j > 0) {
            create.ttl(j);
        }
        if (obj instanceof String) {
            create.withBody((String) obj);
        } else if (obj instanceof Boolean) {
            create.withBooleanAsBody(((Boolean) obj).booleanValue());
        } else if (obj instanceof Buffer) {
            create.withBufferAsBody((Buffer) obj);
        } else if (obj instanceof Byte) {
            create.withByteAsBody(((Byte) obj).byteValue());
        } else if (obj instanceof Character) {
            create.withCharAsBody(((Character) obj).charValue());
        } else if (obj instanceof Double) {
            create.withDoubleAsBody(((Double) obj).doubleValue());
        } else if (obj instanceof Float) {
            create.withFloatAsBody(((Float) obj).floatValue());
        } else if (obj instanceof Instant) {
            create.withInstantAsBody((Instant) obj);
        } else if (obj instanceof Integer) {
            create.withIntegerAsBody(((Integer) obj).intValue());
        } else if (obj instanceof JsonArray) {
            create.withJsonArrayAsBody((JsonArray) obj);
        } else if (obj instanceof JsonObject) {
            create.withJsonObjectAsBody((JsonObject) obj);
        } else if (obj instanceof Long) {
            create.withLongAsBody(((Long) obj).longValue());
        } else if (obj instanceof Short) {
            create.withShortAsBody(((Short) obj).shortValue());
        } else if (obj instanceof UUID) {
            create.withUuidAsBody((UUID) obj);
        } else {
            create.withBody(obj.toString());
        }
        return create.build();
    }

    @PreDestroy
    public synchronized void close() {
        if (this.client != null) {
            this.client.close();
        }
    }
}
