package io.smallrye.reactive.messaging.mqtt;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Observable;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.mqtt.MqttClient;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/MqttSource.class */
public class MqttSource {
    private final PublisherBuilder<MqttMessage<?>> source;
    private final Logger LOGGER = LoggerFactory.getLogger(MqttSource.class);
    private AtomicBoolean subscribed = new AtomicBoolean();

    public MqttSource(Vertx vertx, Config config) {
        MqttClientOptions mqttClientOptions = new MqttClientOptions();
        mqttClientOptions.setClientId((String) config.getOptionalValue("client-id", String.class).orElse(null));
        mqttClientOptions.setAutoGeneratedClientId(((Boolean) config.getOptionalValue("auto-generated-client-id", Boolean.class).orElse(false)).booleanValue());
        mqttClientOptions.setAutoKeepAlive(((Boolean) config.getOptionalValue("auto-keep-alive", Boolean.class).orElse(true)).booleanValue());
        mqttClientOptions.setSsl(((Boolean) config.getOptionalValue("ssl", Boolean.class).orElse(false)).booleanValue());
        mqttClientOptions.setWillQoS(((Integer) config.getOptionalValue("will-qos", Integer.class).orElse(0)).intValue());
        mqttClientOptions.setKeepAliveTimeSeconds(((Integer) config.getOptionalValue("keep-alive-seconds", Integer.class).orElse(30)).intValue());
        mqttClientOptions.setMaxInflightQueue(((Integer) config.getOptionalValue("max-inflight-queue", Integer.class).orElse(10)).intValue());
        mqttClientOptions.setCleanSession(((Boolean) config.getOptionalValue("auto-clean-session", Boolean.class).orElse(true)).booleanValue());
        mqttClientOptions.setWillFlag(((Boolean) config.getOptionalValue("will-flag", Boolean.class).orElse(false)).booleanValue());
        mqttClientOptions.setWillRetain(((Boolean) config.getOptionalValue("will-retain", Boolean.class).orElse(false)).booleanValue());
        mqttClientOptions.setMaxMessageSize(((Integer) config.getOptionalValue("max-message-size", Integer.class).orElse(-1)).intValue());
        mqttClientOptions.setReconnectAttempts(((Integer) config.getOptionalValue("reconnect-attempts", Integer.class).orElse(5)).intValue());
        mqttClientOptions.setReconnectInterval(TimeUnit.SECONDS.toMillis(((Integer) config.getOptionalValue("reconnect-interval-seconds", Integer.class).orElse(1)).intValue()));
        mqttClientOptions.setUsername((String) config.getOptionalValue("username", String.class).orElse(null));
        mqttClientOptions.setPassword((String) config.getOptionalValue("password", String.class).orElse(null));
        mqttClientOptions.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(((Integer) config.getOptionalValue("connect-timeout-seconds", Integer.class).orElse(60)).intValue()));
        mqttClientOptions.setTrustAll(((Boolean) config.getOptionalValue("trust-all", Boolean.class).orElse(false)).booleanValue());
        String str = (String) config.getOptionalValue("host", String.class).orElseThrow(() -> {
            return new NoSuchElementException("Invalid configuration - expected key `host` to be present in " + config.getPropertyNames());
        });
        int intValue = ((Integer) config.getOptionalValue("port", Integer.class).orElse(Integer.valueOf(mqttClientOptions.isSsl() ? 8883 : 1883))).intValue();
        String str2 = (String) config.getOptionalValue("server-name", String.class).orElse(null);
        String topicOrFail = getTopicOrFail(config);
        MqttClient create = MqttClient.create(vertx, mqttClientOptions);
        int intValue2 = ((Integer) config.getOptionalValue("qos", Integer.class).orElse(0)).intValue();
        boolean booleanValue = ((Boolean) config.getOptionalValue("broadcast", Boolean.class).orElse(false)).booleanValue();
        this.source = ReactiveStreams.fromPublisher(create.rxConnect(intValue, str, str2).flatMapObservable(mqttConnAckMessage -> {
            return Observable.create(observableEmitter -> {
                create.publishHandler(mqttPublishMessage -> {
                    observableEmitter.onNext(new ReceivingMqttMessage(mqttPublishMessage));
                });
                create.subscribe(topicOrFail, intValue2, asyncResult -> {
                    System.out.println("Subscribing ...");
                    if (asyncResult.failed()) {
                        observableEmitter.onError(asyncResult.cause());
                    }
                    this.subscribed.set(asyncResult.succeeded());
                });
            });
        }).toFlowable(BackpressureStrategy.BUFFER).compose(flowable -> {
            return booleanValue ? flowable.publish().autoConnect() : flowable;
        }).doOnCancel(() -> {
            this.subscribed.set(false);
            create.disconnect();
        }).doOnError(th -> {
            this.LOGGER.error("Unable to establish a connection with the MQTT broker", th);
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherBuilder<MqttMessage<?>> getSource() {
        return this.source;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSubscribed() {
        return this.subscribed.get();
    }

    private String getTopicOrFail(Config config) {
        return (String) config.getOptionalValue("topic", String.class).orElseGet(() -> {
            return (String) config.getOptionalValue("channel-name", String.class).orElseThrow(() -> {
                return new IllegalArgumentException("Topic attribute must be set");
            });
        });
    }
}
