package io.smallrye.reactive.messaging.mqtt;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.smallrye.reactive.messaging.mqtt.Clients;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/MqttSink.class */
public class MqttSink {
    private final String topic;
    private final int qos;
    private final SubscriberBuilder<? extends Message<?>, Void> sink;
    private final AtomicBoolean ready = new AtomicBoolean();

    public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration mqttConnectorOutgoingConfiguration) {
        MqttClientSessionOptions createMqttClientOptions = MqttHelpers.createMqttClientOptions(mqttConnectorOutgoingConfiguration);
        Optional<String> topic = mqttConnectorOutgoingConfiguration.getTopic();
        mqttConnectorOutgoingConfiguration.getClass();
        this.topic = topic.orElseGet(mqttConnectorOutgoingConfiguration::getChannel);
        this.qos = mqttConnectorOutgoingConfiguration.getQos().intValue();
        AtomicReference atomicReference = new AtomicReference();
        ProcessorBuilder onComplete = ReactiveStreams.builder().flatMapCompletionStage(message -> {
            Clients.ClientHolder clientHolder = (Clients.ClientHolder) atomicReference.get();
            if (clientHolder == null) {
                clientHolder = Clients.getHolder(vertx, createMqttClientOptions);
                atomicReference.set(clientHolder);
            }
            return clientHolder.start().onSuccess(r4 -> {
                this.ready.set(true);
            }).map(r3 -> {
                return message;
            }).toCompletionStage();
        }).flatMapCompletionStage(message2 -> {
            return send(atomicReference, message2);
        }).onComplete(() -> {
            Clients.ClientHolder clientHolder = (Clients.ClientHolder) atomicReference.getAndSet(null);
            if (clientHolder != null) {
                clientHolder.close().onComplete(asyncResult -> {
                    this.ready.set(false);
                });
            }
        });
        MqttLogging mqttLogging = MqttLogging.log;
        mqttLogging.getClass();
        this.sink = onComplete.onError(mqttLogging::errorWhileSendingMessageToBroker).ignore();
    }

    private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> atomicReference, Message<?> message) {
        String str;
        boolean z;
        MqttQoS valueOf;
        MqttClientSession client = atomicReference.get().getClient();
        if (message instanceof SendingMqttMessage) {
            SendingMqttMessage sendingMqttMessage = (SendingMqttMessage) message;
            str = sendingMqttMessage.getTopic() == null ? this.topic : sendingMqttMessage.getTopic();
            valueOf = sendingMqttMessage.getQosLevel() == null ? MqttQoS.valueOf(this.qos) : sendingMqttMessage.getQosLevel();
            z = sendingMqttMessage.isRetain();
        } else {
            str = this.topic;
            z = false;
            valueOf = MqttQoS.valueOf(this.qos);
        }
        if (str == null) {
            MqttLogging.log.ignoringNoTopicSet();
            return CompletableFuture.completedFuture(message);
        }
        String str2 = str;
        MqttQoS mqttQoS = valueOf;
        boolean z2 = z;
        return AsyncResultUni.toUni(handler -> {
            client.publish(str2, convert(message.getPayload()).getDelegate(), mqttQoS, false, z2).onComplete(handler);
        }).onItemOrFailure().transformToUni((num, th) -> {
            return th != null ? Uni.createFrom().completionStage(message.nack(th).thenApply(r3 -> {
                return message;
            })) : Uni.createFrom().completionStage(message.ack().thenApply(r32 -> {
                return message;
            }));
        }).subscribeAsCompletionStage();
    }

    private Buffer convert(Object obj) {
        return obj instanceof JsonObject ? new Buffer(((JsonObject) obj).toBuffer()) : obj instanceof JsonArray ? new Buffer(((JsonArray) obj).toBuffer()) : ((obj instanceof String) || obj.getClass().isPrimitive()) ? new Buffer(io.vertx.core.buffer.Buffer.buffer(obj.toString())) : obj instanceof byte[] ? new Buffer(io.vertx.core.buffer.Buffer.buffer((byte[]) obj)) : obj instanceof Buffer ? (Buffer) obj : obj instanceof io.vertx.core.buffer.Buffer ? new Buffer((io.vertx.core.buffer.Buffer) obj) : new Buffer(Json.encodeToBuffer(obj));
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.sink;
    }

    public boolean isReady() {
        return this.ready.get();
    }
}
