/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.events;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.events.ClusterMetadata;
import org.apache.kafka.connect.runtime.events.EventsException;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;

public class DistributedClusterMetadata
implements ClusterMetadata {
    private final ConfigBackingStore configBackingStore;
    private final DistributedHerder herder;
    private final Map<String, List<String>> sensitiveConfigsCache;
    private final Map<String, List<ConfigKeyInfo>> configKeyInfoCache;
    private final Map<String, Map<String, String>> connectorConfigs;
    private final Set<String> clientSensitiveConfigs;
    private final DistributedConfig distributedConfig;
    private static final Set<String> COMMON_CLASS_CONFIGS = Set.of("connector.class", "value.converter", "key.converter", "header.converter");

    public DistributedClusterMetadata(ConfigBackingStore configBackingStore, DistributedHerder herder, DistributedConfig distributedConfig) {
        this.configBackingStore = configBackingStore;
        this.herder = herder;
        this.sensitiveConfigsCache = new HashMap<String, List<String>>();
        this.connectorConfigs = new HashMap<String, Map<String, String>>();
        this.clientSensitiveConfigs = new HashSet<String>();
        this.clientSensitiveConfigs.addAll(this.passwordConfigs(ConsumerConfig.configDef()));
        this.clientSensitiveConfigs.addAll(this.passwordConfigs(ProducerConfig.configDef()));
        this.clientSensitiveConfigs.addAll(this.passwordConfigs(AdminClientConfig.configDef()));
        this.configKeyInfoCache = new HashMap<String, List<ConfigKeyInfo>>();
        this.distributedConfig = distributedConfig;
    }

    private Set<String> passwordConfigs(ConfigDef configDef) {
        return configDef.configKeys().values().stream().filter(config -> config.type().isSensitive()).map(config -> config.name).collect(Collectors.toSet());
    }

    @Override
    public String metadataKafkaClusterId() {
        return this.herder.kafkaClusterId();
    }

    @Override
    public String dataKafkaClusterId(String connectorName) throws EventsException {
        Set connectorBootstrapServers;
        String connectorPrefix;
        ConnectorType connectorType = this.connectorType(connectorName);
        String workerPrefix = switch (connectorType) {
            case ConnectorType.SOURCE -> {
                connectorPrefix = "producer.override.";
                yield "producer.";
            }
            case ConnectorType.SINK -> {
                connectorPrefix = "consumer.override.";
                yield "consumer.";
            }
            default -> throw new EventsException.UnknownConnectorException(connectorName);
        };
        String metadataKafkaClusterId = this.metadataKafkaClusterId();
        String connectorOverridesConfig = connectorPrefix + "bootstrap.servers";
        Map<String, String> connectorConfig = this.validateConnectorExisted(connectorName);
        Map workerPrefixMap = this.distributedConfig.originalsWithPrefix(workerPrefix);
        if (connectorConfig.containsKey(connectorOverridesConfig) && connectorConfig.get(connectorOverridesConfig) != null) {
            overridesBootstrap = connectorConfig.getOrDefault(connectorOverridesConfig, "");
            parsedBootstrapServers = overridesBootstrap.split(",");
            connectorBootstrapServers = Arrays.stream(parsedBootstrapServers).map(String::trim).collect(Collectors.toSet());
        } else if (workerPrefixMap.containsKey("bootstrap.servers") && !workerPrefixMap.get("bootstrap.servers").toString().isEmpty()) {
            overridesBootstrap = workerPrefixMap.get("bootstrap.servers").toString();
            parsedBootstrapServers = overridesBootstrap.split(",");
            connectorBootstrapServers = Arrays.stream(parsedBootstrapServers).map(String::trim).collect(Collectors.toSet());
        } else {
            return metadataKafkaClusterId;
        }
        if (connectorBootstrapServers.isEmpty()) {
            return metadataKafkaClusterId;
        }
        HashSet workerBootstrapServers = new HashSet(this.distributedConfig.getList("bootstrap.servers"));
        for (String bootstrapServer : connectorBootstrapServers) {
            if (!workerBootstrapServers.contains(bootstrapServer)) continue;
            return metadataKafkaClusterId;
        }
        return null;
    }

    private Map<String, String> validateConnectorExisted(String connectorName) throws EventsException {
        Map<String, String> configs = this.connectorConfigs.get(connectorName);
        if (configs == null) {
            throw new EventsException.UnknownConnectorException(connectorName);
        }
        return configs;
    }

    @Override
    public ConnectorType connectorType(String connectorName) throws EventsException {
        return this.herder.connectorType(this.validateConnectorExisted(connectorName));
    }

    private void addAliasedConfigClasses(String aliasGroup, Map<String, String> connectorConfig, Map<String, String> classConfigs) {
        if (!connectorConfig.containsKey(aliasGroup)) {
            return;
        }
        String aliases = connectorConfig.get(aliasGroup);
        for (String alias : aliases.split(",")) {
            String aliasedConfigsPrefix = aliasGroup + "." + alias;
            String aliasTypeConfig = aliasedConfigsPrefix + ".type";
            classConfigs.put(aliasedConfigsPrefix, connectorConfig.get(aliasTypeConfig));
        }
    }

    private Map<String, String> classConfigs(Map<String, String> connectorConfig) {
        HashMap<String, String> configs = new HashMap<String, String>();
        for (String configName : COMMON_CLASS_CONFIGS) {
            if (!connectorConfig.containsKey(configName)) continue;
            configs.put(configName, connectorConfig.get(configName));
        }
        this.addAliasedConfigClasses("transforms", connectorConfig, configs);
        this.addAliasedConfigClasses("predicates", connectorConfig, configs);
        return configs;
    }

    private List<String> sensitiveConfigs(String pluginClass) {
        return this.sensitiveConfigsCache.computeIfAbsent(pluginClass, k -> this.configKeyInfo(pluginClass).stream().filter(c -> c.type().equals(ConfigDef.Type.PASSWORD.name())).map(ConfigKeyInfo::name).collect(Collectors.toList()));
    }

    private List<ConfigKeyInfo> configKeyInfo(String pluginClass) {
        return this.configKeyInfoCache.computeIfAbsent(pluginClass, k -> this.herder.connectorPluginConfig(pluginClass));
    }

    private void maskClientSensitiveConfigs(Map<String, String> connectorConfigs) {
        for (String prefix : Arrays.asList("consumer.override.", "producer.override.", "admin.override.")) {
            for (String config : this.clientSensitiveConfigs) {
                String fullConfigName = prefix + config;
                if (!connectorConfigs.containsKey(fullConfigName)) continue;
                connectorConfigs.put(fullConfigName, "***********");
            }
        }
    }

    @Override
    public Map<String, String> maskedConnectorConfigs(String connectorName) throws EventsException {
        Map<String, String> connectorConfig = this.validateConnectorExisted(connectorName);
        Map<String, String> klassConfigs = this.classConfigs(connectorConfig);
        klassConfigs.forEach((key, value) -> {
            List<String> configsToMask = this.sensitiveConfigs((String)value);
            Object prefix = key.equals("connector.class") ? "" : key + ".";
            configsToMask.forEach(arg_0 -> DistributedClusterMetadata.lambda$maskedConnectorConfigs$1(connectorConfig, (String)prefix, arg_0));
        });
        this.maskClientSensitiveConfigs(connectorConfig);
        return connectorConfig;
    }

    @Override
    public List<String> connectorTopics(String connectorName) throws EventsException {
        this.validateConnectorExisted(connectorName);
        return new ArrayList<String>(this.herder.connectorActiveTopics(connectorName).topics());
    }

    @Override
    public ConnectorStateInfo connectorStatus(String connectorName) throws EventsException {
        try {
            return this.herder.connectorStatus(connectorName);
        }
        catch (NotFoundException e) {
            throw new EventsException.UnknownConnectorException(connectorName);
        }
    }

    @Override
    public String connectClusterId() {
        return this.herder.workerGroupId();
    }

    @Override
    public Set<String> connectors() {
        return Collections.unmodifiableSet(this.connectorConfigs.keySet());
    }

    private String extractPluginDefault(String configName, String pluginClass) {
        return this.configKeyInfo(pluginClass).stream().filter(configKeyInfo -> configKeyInfo.name().equals(configName)).findFirst().map(ConfigKeyInfo::defaultValue).orElse("");
    }

    @Override
    public int tasksMax(String connectorName) throws EventsException {
        Map<String, String> connectorConfig = this.validateConnectorExisted(connectorName);
        if (connectorConfig.containsKey("tasks.max")) {
            return Integer.parseInt(connectorConfig.get("tasks.max"));
        }
        String pluginDefault = this.extractPluginDefault("tasks.max", connectorConfig.get("connector.class"));
        if (pluginDefault.isEmpty()) {
            return 1;
        }
        return Integer.parseInt(pluginDefault);
    }

    @Override
    public String valueConverter(String connectorName) throws EventsException {
        Map<String, String> connectorConfig = this.validateConnectorExisted(connectorName);
        if (connectorConfig.containsKey("value.converter")) {
            return connectorConfig.get("value.converter");
        }
        String pluginDefault = this.extractPluginDefault("value.converter", connectorConfig.get("connector.class"));
        if (pluginDefault.isEmpty()) {
            return this.distributedConfig.getClass("value.converter").getName();
        }
        return pluginDefault;
    }

    public DistributedConfig getDistributedConfig() {
        return this.distributedConfig;
    }

    @Override
    public boolean isReady() {
        return this.herder.isReady();
    }

    @Override
    public boolean refresh() {
        if (!this.isReady()) {
            return false;
        }
        try {
            ClusterConfigState clusterConfigState = this.configBackingStore.snapshot();
            this.connectorConfigs.clear();
            clusterConfigState.connectors().forEach(connectorName -> {
                Map<String, String> configs = clusterConfigState.connectorConfig((String)connectorName);
                this.connectorConfigs.put((String)connectorName, configs);
            });
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }

    private static /* synthetic */ void lambda$maskedConnectorConfigs$1(Map connectorConfig, String prefix, String config) {
        connectorConfig.computeIfPresent(prefix + config, (k, v) -> "***********");
    }
}

