/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.impl;

import io.smallrye.reactive.messaging.StreamRegistar;
import io.smallrye.reactive.messaging.StreamRegistry;
import io.smallrye.reactive.messaging.spi.PublisherFactory;
import io.smallrye.reactive.messaging.spi.SubscriberFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class ConfiguredStreamFactory
implements StreamRegistar {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConfiguredStreamFactory.class);
    private static final String SOURCE_CONFIG_PREFIX = "smallrye.messaging.source";
    private static final String SINK_CONFIG_PREFIX = "smallrye.messaging.sink";
    private final List<PublisherFactory> sourceFactories;
    private final List<SubscriberFactory> sinkFactories;
    private final Config config;
    private final StreamRegistry registry;
    private final Map<String, Publisher<? extends Message>> sources = new HashMap<String, Publisher<? extends Message>>();
    private final Map<String, Subscriber<? extends Message>> sinks = new HashMap<String, Subscriber<? extends Message>>();

    ConfiguredStreamFactory() {
        this.sourceFactories = null;
        this.sinkFactories = null;
        this.config = null;
        this.registry = null;
    }

    @Inject
    public ConfiguredStreamFactory(@Any Instance<PublisherFactory> sourceFactories, @Any Instance<SubscriberFactory> sinkFactories, Instance<Config> config, @Any Instance<StreamRegistry> registry) {
        this.registry = (StreamRegistry)registry.get();
        if (config.isUnsatisfied()) {
            this.sourceFactories = Collections.emptyList();
            this.sinkFactories = Collections.emptyList();
            this.config = null;
        } else {
            this.sourceFactories = sourceFactories.stream().collect(Collectors.toList());
            this.sinkFactories = sinkFactories.stream().collect(Collectors.toList());
            LOGGER.info("Found source types: {}", sourceFactories.stream().map(PublisherFactory::type).collect(Collectors.toList()));
            LOGGER.info("Found sink types: {}", sinkFactories.stream().map(SubscriberFactory::type).collect(Collectors.toList()));
            this.config = (Config)config.stream().findFirst().orElseThrow(() -> new IllegalStateException("Unable to retrieve the config"));
        }
    }

    static Map<String, Map<String, String>> extractConfigurationFor(String prefix, Config root) {
        Iterable names = root.getPropertyNames();
        HashMap<String, Map<String, String>> configs = new HashMap<String, Map<String, String>>();
        names.forEach(key -> {
            if (key.startsWith(prefix)) {
                String name = key.substring(prefix.length() + 1);
                if (name.contains(".")) {
                    String tmp = name;
                    name = tmp.substring(0, tmp.indexOf(46));
                    String subkey = tmp.substring(tmp.indexOf(46) + 1);
                    Map map = configs.computeIfAbsent(name, x -> new HashMap());
                    map.put(subkey, root.getValue(key, String.class));
                } else {
                    Map map = configs.computeIfAbsent(name, x -> new HashMap());
                    map.put("name", root.getValue(key, String.class));
                }
            }
        });
        return configs;
    }

    @Override
    public CompletionStage<Void> initialize() {
        if (this.config == null) {
            LOGGER.info("No MicroProfile Config found, skipping");
            return CompletableFuture.completedFuture(null);
        }
        LOGGER.info("Stream manager initializing...");
        Map<String, Map<String, String>> sourceConfiguration = ConfiguredStreamFactory.extractConfigurationFor(SOURCE_CONFIG_PREFIX, this.config);
        Map<String, Map<String, String>> sinkConfiguration = ConfiguredStreamFactory.extractConfigurationFor(SINK_CONFIG_PREFIX, this.config);
        ArrayList tasks = new ArrayList();
        sourceConfiguration.forEach((name, conf) -> tasks.add(this.createSourceFromConfig((String)name, (Map<String, String>)conf)));
        sinkConfiguration.forEach((name, conf) -> tasks.add(this.createSinkFromConfig((String)name, (Map<String, String>)conf)));
        CompletableFuture<Void> all = CompletableFuture.allOf((CompletableFuture[])tasks.stream().map(CompletionStage::toCompletableFuture).toArray(CompletableFuture[]::new));
        return all.whenComplete((x, err) -> {
            if (err == null) {
                LOGGER.info("Publishers created during initialization: {}", this.sources.keySet());
                LOGGER.info("Subscribers created during initialization: {}", this.sinks.keySet());
                this.sources.forEach(this.registry::register);
                this.sinks.forEach(this.registry::register);
            } else {
                LOGGER.error("Unable to create the publisher or subscriber during initialization", err);
            }
        });
    }

    private CompletionStage createSourceFromConfig(String name, Map<String, String> conf) {
        String type = Optional.ofNullable(conf.get("type")).map(Object::toString).orElseThrow(() -> new IllegalArgumentException("Invalid source, no type for " + name));
        PublisherFactory mySourceFactory = this.sourceFactories.stream().filter(factory -> factory.type().getName().equalsIgnoreCase(type)).findFirst().orElseThrow(() -> new IllegalArgumentException("Unknown source type for " + name + ", supported types are " + this.sourceFactories.stream().map(sf -> sf.type().getName()).collect(Collectors.toList())));
        return mySourceFactory.createPublisher(conf).thenAccept(p -> this.sources.put(name, (Publisher<? extends Message>)p));
    }

    private CompletionStage createSinkFromConfig(String name, Map<String, String> conf) {
        String type = Optional.ofNullable(conf.get("type")).map(Object::toString).orElseThrow(() -> new IllegalArgumentException("Invalid sink, no type for " + name));
        SubscriberFactory mySinkFactory = this.sinkFactories.stream().filter(factory -> factory.type().getName().equalsIgnoreCase(type)).findFirst().orElseThrow(() -> new IllegalArgumentException("Unknown sink type for " + name + ", supported types are " + this.sinkFactories.stream().map(sf -> sf.type().getName()).collect(Collectors.toList())));
        return mySinkFactory.createSubscriber(conf).thenAccept(p -> this.sinks.put(name, (Subscriber<? extends Message>)p));
    }
}

