/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.impl;

import io.smallrye.reactive.messaging.StreamFactory;
import io.smallrye.reactive.messaging.StreamRegistry;
import io.smallrye.reactive.messaging.spi.PublisherFactory;
import io.smallrye.reactive.messaging.spi.SubscriberFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

@ApplicationScoped
public class StreamFactoryImpl
implements StreamFactory {
    private static final String NAME_MUST_BE_SET = "'name' must be set";
    private final StreamRegistry registry;
    private final Map<String, PublisherFactory> publisherFactories = new HashMap<String, PublisherFactory>();
    private Map<String, SubscriberFactory> subscriberFactories = new HashMap<String, SubscriberFactory>();

    @Inject
    public StreamFactoryImpl(@Any Instance<PublisherFactory> pubs, @Any Instance<SubscriberFactory> subs, StreamRegistry registry) {
        this.registry = registry;
        pubs.stream().forEach(pf -> this.publisherFactories.put(pf.type().getName(), (PublisherFactory)pf));
        subs.stream().forEach(pf -> this.subscriberFactories.put(pf.type().getName(), (SubscriberFactory)pf));
    }

    @Override
    public synchronized CompletionStage<Publisher<? extends Message>> createPublisherAndRegister(String name, Map<String, String> config) {
        Objects.requireNonNull(name, NAME_MUST_BE_SET);
        String type = Optional.ofNullable(config.get("type")).map(Object::toString).orElseThrow(() -> new IllegalArgumentException("Invalid publisher, no type for " + name));
        return this.createPublisher(type, config).thenApply(publisher -> this.registry.register(name, (Publisher<? extends Message>)publisher));
    }

    @Override
    public synchronized CompletionStage<Subscriber<? extends Message>> createSubscriberAndRegister(String name, Map<String, String> config) {
        Objects.requireNonNull(name, NAME_MUST_BE_SET);
        String type = Optional.ofNullable(config.get("type")).map(Object::toString).orElseThrow(() -> new IllegalArgumentException("Invalid subscriber, no type for " + name));
        return this.createSubscriber(type, config).thenApply(subscriber -> this.registry.register(name, (Subscriber<? extends Message>)subscriber));
    }

    @Override
    public synchronized CompletionStage<Publisher<? extends Message>> createPublisher(String type, Map<String, String> config) {
        PublisherFactory factory = this.publisherFactories.get(Objects.requireNonNull(type, "'type' must be set, known types are: " + this.publisherFactories.keySet()));
        if (factory == null) {
            throw new IllegalArgumentException("Unknown type: " + type + ", known types are: " + this.publisherFactories.keySet());
        }
        return factory.createPublisher(config);
    }

    @Override
    public CompletionStage<Subscriber<? extends Message>> createSubscriber(String type, Map<String, String> config) {
        SubscriberFactory factory = this.subscriberFactories.get(Objects.requireNonNull(type, "'type' must be set, known types are: " + this.subscriberFactories.keySet()));
        if (factory == null) {
            throw new IllegalArgumentException("Unknown type: " + type + ", known types are: " + this.subscriberFactories.keySet());
        }
        return factory.createSubscriber(config);
    }
}

