package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.MqttClient;
import io.vertx.mutiny.mqtt.messages.MqttConnAckMessage;
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/Clients.class */
public class Clients {
    private static final Map<String, ClientHolder> clients = new ConcurrentHashMap();

    /* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/Clients$ClientHolder.class */
    public static class ClientHolder {
        private final MqttClient client;
        private final Uni<MqttConnAckMessage> connection;
        private final BroadcastProcessor<MqttPublishMessage> messages = BroadcastProcessor.create();

        public ClientHolder(MqttClient mqttClient, String str, int i, String str2) {
            this.client = mqttClient;
            this.connection = mqttClient.connect(i, str, str2).cache();
            BroadcastProcessor<MqttPublishMessage> broadcastProcessor = this.messages;
            broadcastProcessor.getClass();
            mqttClient.publishHandler((v1) -> {
                r1.onNext(v1);
            });
            mqttClient.closeHandler(r3 -> {
                this.messages.onComplete();
            });
            BroadcastProcessor<MqttPublishMessage> broadcastProcessor2 = this.messages;
            broadcastProcessor2.getClass();
            mqttClient.exceptionHandler(broadcastProcessor2::onError);
        }

        public Uni<MqttClient> connect() {
            return this.connection.map(mqttConnAckMessage -> {
                return this.client;
            });
        }

        public void close() {
            if (this.client.isConnected()) {
                this.client.disconnectAndAwait();
            }
        }

        public Multi<MqttPublishMessage> stream() {
            return this.messages;
        }
    }

    private Clients() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Uni<MqttClient> getConnectedClient(Vertx vertx, String str, int i, String str2, MqttClientOptions mqttClientOptions) {
        return clients.computeIfAbsent(str + i + "<" + (str2 == null ? "" : str2) + ">-[" + (mqttClientOptions.getClientId() != null ? mqttClientOptions.getClientId() : "") + "]", str3 -> {
            return new ClientHolder(MqttClient.create(vertx, mqttClientOptions), str, i, str2);
        }).connect();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ClientHolder getHolder(Vertx vertx, String str, int i, String str2, MqttClientOptions mqttClientOptions) {
        return clients.computeIfAbsent(str + i + "<" + (str2 == null ? "" : str2) + ">-[" + (mqttClientOptions.getClientId() != null ? mqttClientOptions.getClientId() : "") + "]", str3 -> {
            return new ClientHolder(MqttClient.create(vertx, mqttClientOptions), str, i, str2);
        });
    }

    public static void clear() {
        clients.forEach((str, clientHolder) -> {
            clientHolder.close();
        });
        clients.clear();
    }
}
