package io.smallrye.reactive.messaging.mqtt;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.mqtt.MqttClient;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/MqttSink.class */
public class MqttSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttSink.class);
    private final String host;
    private final int port;
    private final MqttClient client;
    private final String server;
    private final String topic;
    private final int qos;
    private final SubscriberBuilder<? extends Message<?>, Void> sink;
    private final AtomicBoolean connected = new AtomicBoolean();

    public MqttSink(Vertx vertx, Config config) {
        MqttClientOptions mqttClientOptions = new MqttClientOptions();
        mqttClientOptions.setClientId((String) config.getOptionalValue("client-id", String.class).orElse(null));
        mqttClientOptions.setAutoGeneratedClientId(((Boolean) config.getOptionalValue("auto-generated-client-id", Boolean.class).orElse(false)).booleanValue());
        mqttClientOptions.setAutoKeepAlive(((Boolean) config.getOptionalValue("auto-keep-alive", Boolean.class).orElse(true)).booleanValue());
        mqttClientOptions.setSsl(((Boolean) config.getOptionalValue("ssl", Boolean.class).orElse(false)).booleanValue());
        mqttClientOptions.setWillQoS(((Integer) config.getOptionalValue("will-qos", Integer.class).orElse(0)).intValue());
        mqttClientOptions.setKeepAliveTimeSeconds(((Integer) config.getOptionalValue("keep-alive-seconds", Integer.class).orElse(30)).intValue());
        mqttClientOptions.setMaxInflightQueue(((Integer) config.getOptionalValue("max-inflight-queue", Integer.class).orElse(10)).intValue());
        mqttClientOptions.setCleanSession(((Boolean) config.getOptionalValue("auto-clean-session", Boolean.class).orElse(true)).booleanValue());
        mqttClientOptions.setWillFlag(((Boolean) config.getOptionalValue("will-flag", Boolean.class).orElse(false)).booleanValue());
        mqttClientOptions.setWillRetain(((Boolean) config.getOptionalValue("will-retain", Boolean.class).orElse(false)).booleanValue());
        mqttClientOptions.setMaxMessageSize(((Integer) config.getOptionalValue("max-message-size", Integer.class).orElse(-1)).intValue());
        mqttClientOptions.setReconnectAttempts(((Integer) config.getOptionalValue("reconnect-attempts", Integer.class).orElse(5)).intValue());
        mqttClientOptions.setReconnectInterval(TimeUnit.SECONDS.toMillis(((Integer) config.getOptionalValue("reconnect-interval-seconds", Integer.class).orElse(1)).intValue()));
        mqttClientOptions.setUsername((String) config.getOptionalValue("username", String.class).orElse(null));
        mqttClientOptions.setPassword((String) config.getOptionalValue("password", String.class).orElse(null));
        mqttClientOptions.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(((Integer) config.getOptionalValue("connect-timeout-seconds", Integer.class).orElse(60)).intValue()));
        mqttClientOptions.setTrustAll(((Boolean) config.getOptionalValue("trust-all", Boolean.class).orElse(false)).booleanValue());
        this.host = (String) config.getValue("host", String.class);
        this.port = ((Integer) config.getOptionalValue("port", Integer.class).orElse(Integer.valueOf(mqttClientOptions.isSsl() ? 8883 : 1883))).intValue();
        this.server = (String) config.getOptionalValue("server-name", String.class).orElse(null);
        this.topic = getTopicOrNull(config);
        this.client = MqttClient.create(vertx, mqttClientOptions);
        this.qos = ((Integer) config.getOptionalValue("qos", Integer.class).orElse(0)).intValue();
        ProcessorBuilder flatMapCompletionStage = ReactiveStreams.builder().flatMapCompletionStage(message -> {
            if (this.connected.get()) {
                return CompletableFuture.completedFuture(message);
            }
            CompletableFuture completableFuture = new CompletableFuture();
            this.client.connect(this.port, this.host, this.server, asyncResult -> {
                if (asyncResult.failed()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                } else {
                    this.connected.set(true);
                    completableFuture.complete(message);
                }
            });
            return completableFuture;
        }).flatMapCompletionStage(message2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            String str = this.topic;
            MqttQoS valueOf = MqttQoS.valueOf(this.qos);
            boolean z = false;
            if (message2 instanceof SendingMqttMessage) {
                SendingMqttMessage sendingMqttMessage = (SendingMqttMessage) message2;
                str = sendingMqttMessage.getTopic() == null ? this.topic : sendingMqttMessage.getTopic();
                valueOf = sendingMqttMessage.getQosLevel() == null ? valueOf : sendingMqttMessage.getQosLevel();
                z = sendingMqttMessage.isRetain();
            }
            if (str == null) {
                LOGGER.error("Ignoring message - no topic set");
                return CompletableFuture.completedFuture(message2);
            }
            this.client.publish(str, convert(message2.getPayload()), valueOf, false, z, asyncResult -> {
                if (asyncResult.failed()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                } else {
                    completableFuture.complete(asyncResult.result());
                }
            });
            return completableFuture;
        });
        MqttClient mqttClient = this.client;
        mqttClient.getClass();
        this.sink = flatMapCompletionStage.onComplete(mqttClient::disconnect).onError(th -> {
            LOGGER.error("An error has been caught while sending a MQTT message to the broker", th);
        }).ignore();
    }

    private Buffer convert(Object obj) {
        return obj instanceof JsonObject ? new Buffer(((JsonObject) obj).toBuffer()) : obj instanceof JsonArray ? new Buffer(((JsonArray) obj).toBuffer()) : ((obj instanceof String) || obj.getClass().isPrimitive()) ? new Buffer(io.vertx.core.buffer.Buffer.buffer(obj.toString())) : obj instanceof byte[] ? new Buffer(io.vertx.core.buffer.Buffer.buffer((byte[]) obj)) : obj instanceof Buffer ? (Buffer) obj : obj instanceof io.vertx.core.buffer.Buffer ? new Buffer((io.vertx.core.buffer.Buffer) obj) : new Buffer(Json.encodeToBuffer(obj));
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.sink;
    }

    private String getTopicOrNull(Config config) {
        return (String) config.getOptionalValue("topic", String.class).orElseGet(() -> {
            return (String) config.getOptionalValue("channel-name", String.class).orElse(null);
        });
    }

    public boolean isReady() {
        return this.connected.get();
    }
}
