package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.MqttClient;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/MqttSource.class */
public class MqttSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttSource.class);
    private final PublisherBuilder<MqttMessage<?>> source;
    private final AtomicBoolean subscribed = new AtomicBoolean();

    public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration mqttConnectorIncomingConfiguration) {
        MqttClientOptions createMqttClientOptions = MqttHelpers.createMqttClientOptions(mqttConnectorIncomingConfiguration);
        String host = mqttConnectorIncomingConfiguration.getHost();
        int intValue = mqttConnectorIncomingConfiguration.getPort().orElse(Integer.valueOf(createMqttClientOptions.isSsl() ? 8883 : 1883)).intValue();
        String orElse = mqttConnectorIncomingConfiguration.getServerName().orElse(null);
        Optional<String> topic = mqttConnectorIncomingConfiguration.getTopic();
        mqttConnectorIncomingConfiguration.getClass();
        String orElseGet = topic.orElseGet(mqttConnectorIncomingConfiguration::getChannel);
        int intValue2 = mqttConnectorIncomingConfiguration.getQos().intValue();
        MqttClient create = MqttClient.create(vertx, createMqttClientOptions);
        boolean booleanValue = mqttConnectorIncomingConfiguration.getBroadcast().booleanValue();
        this.source = ReactiveStreams.fromPublisher(((Multi) create.connect(intValue, host, orElse).onItem().produceMulti(mqttConnAckMessage -> {
            return Multi.createFrom().emitter(multiEmitter -> {
                create.publishHandler(mqttPublishMessage -> {
                    multiEmitter.emit(new ReceivingMqttMessage(mqttPublishMessage));
                });
                UniSubscribe subscribe = create.subscribe(orElseGet, intValue2).subscribe();
                Consumer consumer = num -> {
                    this.subscribed.set(true);
                };
                multiEmitter.getClass();
                subscribe.with(consumer, multiEmitter::fail);
            }, BackPressureStrategy.BUFFER);
        }).then(multi -> {
            return booleanValue ? multi.broadcast().toAllSubscribers() : multi;
        })).on().cancellation(() -> {
            this.subscribed.set(false);
            create.disconnectAndForget();
        }).onFailure().invoke(th -> {
            LOGGER.error("Unable to establish a connection with the MQTT broker", th);
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherBuilder<MqttMessage<?>> getSource() {
        return this.source;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSubscribed() {
        return this.subscribed.get();
    }
}
