/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.kafka;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.confluent.controlcenter.data.ClusterMetadataDao;
import io.confluent.controlcenter.kafka.AdminClientFactory;
import io.confluent.controlcenter.kafka.AdminClientSupplier;
import io.confluent.controlcenter.kafka.ClusterView;
import io.confluent.controlcenter.rest.res.KafkaCluster;
import io.confluent.controlcenter.util.ConfigUtils;
import io.confluent.controlcenter.util.RetryUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterManager
implements AdminClientSupplier<String>,
ClusterView {
    private static final Logger log = LoggerFactory.getLogger(ClusterManager.class);
    private static final int CONFIG_RETRY_BACKOFF = 1000;
    private final AdminClientFactory clientFactory;
    private final ClusterMetadataDao clusterMetadataDao;
    private final ListeningScheduledExecutorService exec = MoreExecutors.listeningDecorator((ScheduledExecutorService)Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setThreadFactory(Executors.defaultThreadFactory()).setNameFormat("cluster-manager-%d").setDaemon(true).build()));
    private final Collection<ClusterView.ClusterCallback> callbacks = Collections.synchronizedList(Lists.newLinkedList());
    private final ConcurrentMap<String, String> configKey = Maps.newConcurrentMap();
    private final ConcurrentMap<String, Map<String, Object>> configurations = Maps.newConcurrentMap();

    @Inject
    public ClusterManager(AdminClientFactory clientFactory, ClusterMetadataDao clusterMetadataDao) {
        this.clientFactory = clientFactory;
        this.clusterMetadataDao = clusterMetadataDao;
    }

    private void addConfig(String key, Map<String, Object> config) {
        Preconditions.checkNotNull((Object)key, (Object)"Configuration name must not be null");
        Preconditions.checkNotNull(config, (Object)"Configuration must not be null");
        Map<String, Object> previousEntry = this.configurations.putIfAbsent(key, (Map<String, Object>)ImmutableMap.copyOf(config));
        Preconditions.checkArgument((previousEntry == null ? 1 : 0) != 0, (String)"Cluster configuration %s already exists", (Object)key);
    }

    public String register(String key, Map<String, Object> config) {
        this.addConfig(key, config);
        try {
            return (String)this.resolveAndRegister(key).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("interrupted registering cluster {}", (Object)key);
            throw new RuntimeException("cluster registration interrupted", e);
        }
        catch (ExecutionException ex) {
            throw Throwables.propagate((Throwable)ex.getCause());
        }
    }

    public ListenableFuture<String> addConfiguration(String key, Map<String, Object> config) {
        this.addConfig(key, config);
        return this.markForRegistration(key);
    }

    private ListenableFuture<String> markForRegistration(final String key) {
        log.debug("marked cluster configuration {} for registration", (Object)key);
        AsyncCallable<String> asyncCallable = new AsyncCallable<String>(){

            public ListenableFuture<String> call() {
                try {
                    ListenableFuture result = ClusterManager.this.resolveAndRegister(key);
                    Futures.addCallback((ListenableFuture)result, (FutureCallback)new FutureCallback<String>(){

                        public void onSuccess(@Nullable String result) {
                        }

                        public void onFailure(Throwable t) {
                            log.warn("attempt to register cluster configuration {} failed, will retry", (Object)key, (Object)t);
                        }
                    });
                    return result;
                }
                catch (Exception e) {
                    log.warn("failed to register cluster configuration {}, please validate the configuration", (Object)key, (Object)e);
                    throw e;
                }
            }
        };
        return RetryUtils.retryWithJitter(this.exec, asyncCallable, 1000);
    }

    private ListenableFuture<String> resolveAndRegister(final String key) {
        final Map config = (Map)this.configurations.get(key);
        Preconditions.checkNotNull((Object)config, (Object)"Configuration {} does not exist");
        ListenableFuture<String> lookup = this.lookupClusterId(config);
        return Futures.transform(lookup, (Function)new Function<String, String>(){

            public String apply(String clusterId) {
                ClusterManager.this.registerAndUpdateMetadata(key, config, clusterId);
                return clusterId;
            }
        }, (Executor)this.exec);
    }

    private ListenableFuture<String> lookupClusterId(Map<String, Object> config) {
        final AdminClient client = this.clientFactory.createClient(config);
        KafkaFuture kafkaFuture = client.describeCluster().clusterId();
        final SettableFuture lookup = SettableFuture.create();
        kafkaFuture.whenComplete((KafkaFuture.BiConsumer)new KafkaFuture.BiConsumer<String, Throwable>(){

            public void accept(String s, Throwable throwable) {
                if (throwable != null) {
                    lookup.setException(throwable);
                } else {
                    lookup.set((Object)s);
                }
                client.close();
            }
        });
        return lookup;
    }

    private void registerAndUpdateMetadata(String key, Map<String, Object> config, String clusterId) {
        this.registerCluster(key, clusterId);
        this.clusterMetadataDao.asyncUpdateKafkaClusterPreservingName(clusterId, new KafkaCluster(clusterId, key, Collections.emptyList(), ConfigUtils.getList(config, "bootstrap.servers")));
    }

    private void registerCluster(String key, final String clusterId) {
        String previous = this.configKey.put(clusterId, key);
        if (previous != null) {
            log.info("cluster {} updated with configuration {} (was {})", new Object[]{clusterId, key, previous});
            this.fireCallbacks(new CallbackConsumer(){

                @Override
                public void apply(ClusterView.ClusterCallback callback) {
                    callback.clusterUpdated(clusterId);
                }
            });
        } else {
            log.info("cluster {} registered with configuration {}", (Object)clusterId, (Object)key);
            this.fireCallbacks(new CallbackConsumer(){

                @Override
                public void apply(ClusterView.ClusterCallback callback) {
                    callback.clusterAdded(clusterId);
                }
            });
        }
    }

    private void fireCallbacks(CallbackConsumer f) {
        for (ClusterView.ClusterCallback callback : this.callbacks) {
            try {
                f.apply(callback);
            }
            catch (Exception e) {
                log.error("Error executing callback {}", (Object)callback, (Object)e);
            }
        }
    }

    @Override
    public AdminClient getClient(String clusterId) {
        return this.clientFactory.createClient(this.getConfigs(clusterId));
    }

    public Map<String, Object> getConfigs(String clusterId) {
        String key = (String)this.configKey.get(clusterId);
        Preconditions.checkArgument((key != null ? 1 : 0) != 0, (String)"Unknown cluster id %s", (Object)clusterId);
        return (Map)this.configurations.get(key);
    }

    @Override
    public void registerClusterCallback(ClusterView.ClusterCallback callback) {
        this.callbacks.add(callback);
    }

    public Map<String, Map<String, Object>> getAllClusterConfigsWithClusterId() {
        HashMap<String, Map<String, Object>> configs = new HashMap<String, Map<String, Object>>();
        for (String clusterId : this.configKey.keySet()) {
            String configurationKey = (String)this.configKey.get(clusterId);
            Map config = this.configurations.getOrDefault(configurationKey, null);
            if (config == null) continue;
            configs.put(clusterId, config);
        }
        return configs;
    }

    private static interface CallbackConsumer {
        public void apply(ClusterView.ClusterCallback var1);
    }
}

