package io.smallrye.reactive.messaging.mqtt;

import io.vertx.reactivex.core.Vertx;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

@ApplicationScoped
@Connector(MqttConnector.CONNECTOR_NAME)
/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/MqttConnector.class */
public class MqttConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
    static final String CONNECTOR_NAME = "smallrye-mqtt";

    @Inject
    private Instance<Vertx> instanceOfVertx;
    private Vertx vertx;
    private boolean internalVertxInstance = false;
    private List<MqttSource> sources = new CopyOnWriteArrayList();
    private List<MqttSink> sinks = new CopyOnWriteArrayList();

    public void terminate(@Observes @BeforeDestroyed(ApplicationScoped.class) Object obj) {
        if (this.internalVertxInstance) {
            this.vertx.close();
        }
    }

    @PostConstruct
    void init() {
        if (!this.instanceOfVertx.isUnsatisfied()) {
            this.vertx = (Vertx) this.instanceOfVertx.get();
        } else {
            this.internalVertxInstance = true;
            this.vertx = Vertx.vertx();
        }
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        MqttSource mqttSource = new MqttSource(this.vertx, config);
        this.sources.add(mqttSource);
        return mqttSource.getSource();
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        MqttSink mqttSink = new MqttSink(this.vertx, config);
        this.sinks.add(mqttSink);
        return mqttSink.getSink();
    }

    public boolean isReady() {
        boolean z = true;
        Iterator<MqttSource> it = this.sources.iterator();
        while (it.hasNext()) {
            z = z && it.next().isSubscribed();
        }
        Iterator<MqttSink> it2 = this.sinks.iterator();
        while (it2.hasNext()) {
            z = z && it2.next().isReady();
        }
        return z;
    }
}
