package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.vertx.core.Future;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/Clients.class */
public class Clients {
    private static final Map<String, ClientHolder> clients = new ConcurrentHashMap();

    /* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/Clients$ClientHolder.class */
    public static class ClientHolder {
        private final MqttClientSession client;
        private final BroadcastProcessor<MqttPublishMessage> messages = BroadcastProcessor.create();

        public ClientHolder(MqttClientSession mqttClientSession) {
            this.client = mqttClientSession;
            mqttClientSession.messageHandler(mqttPublishMessage -> {
                this.messages.onNext(MqttPublishMessage.newInstance(mqttPublishMessage));
            });
            BroadcastProcessor<MqttPublishMessage> broadcastProcessor = this.messages;
            broadcastProcessor.getClass();
            mqttClientSession.exceptionHandler(broadcastProcessor::onError);
        }

        public Future<Void> start() {
            return this.client.start();
        }

        public Future<Void> close() {
            return this.client.stop();
        }

        public Multi<MqttPublishMessage> stream() {
            return this.messages;
        }

        public MqttClientSession getClient() {
            return this.client;
        }
    }

    private Clients() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ClientHolder getHolder(Vertx vertx, MqttClientSessionOptions mqttClientSessionOptions) {
        String hostname = mqttClientSessionOptions.getHostname();
        int port = mqttClientSessionOptions.getPort();
        String clientId = mqttClientSessionOptions.getClientId();
        String orElse = mqttClientSessionOptions.getServerName().orElse(null);
        return clients.computeIfAbsent(mqttClientSessionOptions.getUsername() + ":" + mqttClientSessionOptions.getPassword() + "@" + hostname + ":" + port + "<" + (orElse == null ? "" : orElse) + ">-[" + (clientId != null ? clientId : "") + "]", str -> {
            return new ClientHolder(MqttClientSession.create(vertx.getDelegate(), mqttClientSessionOptions));
        });
    }

    public static void clear() {
        clients.values().forEach((v0) -> {
            v0.close();
        });
        clients.clear();
    }
}
