package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.groups.MultiOnFailure;
import io.smallrye.reactive.messaging.mqtt.Clients;
import io.smallrye.reactive.messaging.mqtt.MqttFailureHandler;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttExceptions;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/MqttSource.class */
public class MqttSource {
    private final PublisherBuilder<MqttMessage<?>> source;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private final Pattern pattern;

    public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration mqttConnectorIncomingConfiguration) {
        MqttClientOptions createMqttClientOptions = MqttHelpers.createMqttClientOptions(mqttConnectorIncomingConfiguration);
        String host = mqttConnectorIncomingConfiguration.getHost();
        int intValue = mqttConnectorIncomingConfiguration.getPort().orElse(Integer.valueOf(createMqttClientOptions.isSsl() ? 8883 : 1883)).intValue();
        String orElse = mqttConnectorIncomingConfiguration.getServerName().orElse(null);
        Optional<String> topic = mqttConnectorIncomingConfiguration.getTopic();
        mqttConnectorIncomingConfiguration.getClass();
        String orElseGet = topic.orElseGet(mqttConnectorIncomingConfiguration::getChannel);
        int intValue2 = mqttConnectorIncomingConfiguration.getQos().intValue();
        boolean booleanValue = mqttConnectorIncomingConfiguration.getBroadcast().booleanValue();
        MqttFailureHandler createFailureHandler = createFailureHandler(MqttFailureHandler.Strategy.from(mqttConnectorIncomingConfiguration.getFailureStrategy()), mqttConnectorIncomingConfiguration.getChannel());
        if (orElseGet.contains("#") || orElseGet.contains("+")) {
            this.pattern = Pattern.compile(orElseGet.replace("+", "[^/]+").replace("#", ".+"));
        } else {
            this.pattern = null;
        }
        Clients.ClientHolder holder = Clients.getHolder(vertx, host, intValue, orElse, createMqttClientOptions);
        MultiOnFailure onFailure = ((Multi) holder.connect().onItem().transformToMulti(mqttClient -> {
            return mqttClient.subscribe(orElseGet, intValue2).onItem().transformToMulti(num -> {
                this.subscribed.set(true);
                return holder.stream().transform().byFilteringItemsWith(mqttPublishMessage -> {
                    return matches(orElseGet, mqttPublishMessage);
                }).onItem().transform(mqttPublishMessage2 -> {
                    return new ReceivingMqttMessage(mqttPublishMessage2, createFailureHandler);
                });
            });
        }).stage(multi -> {
            return booleanValue ? multi.broadcast().toAllSubscribers() : multi;
        })).onCancellation().invoke(() -> {
            this.subscribed.set(false);
        }).onFailure();
        MqttLogging mqttLogging = MqttLogging.log;
        mqttLogging.getClass();
        this.source = ReactiveStreams.fromPublisher(onFailure.invoke(mqttLogging::unableToConnectToBroker));
    }

    private boolean matches(String str, MqttPublishMessage mqttPublishMessage) {
        return this.pattern != null ? this.pattern.matcher(mqttPublishMessage.topicName()).matches() : mqttPublishMessage.topicName().equals(str);
    }

    private MqttFailureHandler createFailureHandler(MqttFailureHandler.Strategy strategy, String str) {
        switch (strategy) {
            case IGNORE:
                return new MqttIgnoreFailure(str);
            case FAIL:
                return new MqttFailStop(str);
            default:
                throw MqttExceptions.ex.illegalArgumentUnknownStrategy(strategy.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherBuilder<MqttMessage<?>> getSource() {
        return this.source;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSubscribed() {
        return this.subscribed.get();
    }
}
