/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.rest;

import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.confluent.controlcenter.data.KafkaDao;
import io.confluent.controlcenter.data.KafkaDaoSupplier;
import io.confluent.controlcenter.rest.RestModule;
import io.confluent.rest.annotations.PerformanceMetric;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.NotAllowedException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/2.0/kafka/{clusterId}")
@Produces(value={"application/json"})
@Consumes(value={"application/json"})
public class KafkaResource {
    private final boolean brokerConfigsEditEnabled;
    private static final Logger log = LoggerFactory.getLogger(KafkaResource.class);

    @Inject
    public KafkaResource(@RestModule.BrokerConfigsEditEnabled boolean brokerConfigsEditEnabled) {
        this.brokerConfigsEditEnabled = brokerConfigsEditEnabled;
    }

    @GET
    @Path(value="/topics")
    @PerformanceMetric(value="kafka.topics.get")
    public List<TopicDescription> getTopics(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao dao = (KafkaDao)daoSupplier.get();){
            List<TopicDescription> list = dao.getTopics(false);
            return list;
        }
    }

    @PUT
    @Path(value="/topics")
    public void createTopic(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @QueryParam(value="validate") @DefaultValue(value="true") boolean validate, NewTopic newTopic) throws KafkaException, InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            kafkaDao.createTopics(Collections.singleton(newTopic), validate);
        }
    }

    @DELETE
    @Path(value="/topics/{topic}")
    public void deleteTopic(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @PathParam(value="topic") String topic) throws KafkaException, InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            kafkaDao.deleteTopics(Collections.singleton(topic));
        }
    }

    @GET
    @Path(value="/topic-defaults")
    public NewTopic getTopicDefaults(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            Collection<Node> nodes = kafkaDao.getNodes();
            if (nodes.isEmpty()) {
                throw new NotFoundException("no valid brokers found for cluster=" + kafkaDao.clusterId());
            }
            Node node = nodes.iterator().next();
            ArrayList resources = Lists.newArrayList();
            ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
            resources.add(brokerResource);
            ConfigResource topicResource = null;
            try {
                topicResource = new ConfigResource(ConfigResource.Type.TOPIC, null);
                resources.add(topicResource);
            }
            catch (Exception exception) {
                // empty catch block
            }
            Map<ConfigResource, Config> configsMap = kafkaDao.getConfigs(resources);
            log.trace("configsMap={}", configsMap);
            if (!configsMap.containsKey(brokerResource)) {
                throw new NotFoundException("no configs returned for broker=" + brokerResource);
            }
            Config brokerConfig = configsMap.get(brokerResource);
            int defaultPartitions = KafkaResource.defaultPartitions(brokerConfig);
            short defaultReplicationFactor = KafkaResource.defaultReplicationFactor(brokerConfig);
            ImmutableMap.Builder configBuilder = ImmutableMap.builder();
            if (topicResource != null && configsMap.containsKey(topicResource)) {
                for (ConfigEntry configEntry : configsMap.get(topicResource).entries()) {
                    configBuilder.put((Object)configEntry.name(), (Object)configEntry.value());
                }
            } else {
                ImmutableMap.builder().put((Object)"cleanup.policy", (Object)KafkaConfig.LogCleanupPolicyProp()).put((Object)"delete.retention.ms", (Object)KafkaConfig.LogCleanerDeleteRetentionMsProp()).put((Object)"min.insync.replicas", (Object)KafkaConfig.MinInSyncReplicasProp()).put((Object)"max.message.bytes", (Object)KafkaConfig.MessageMaxBytesProp()).put((Object)"retention.bytes", (Object)KafkaConfig.LogRetentionBytesProp()).build().forEach((topicConfigName, brokerConfigName) -> {
                    ConfigEntry configEntry = brokerConfig.get(brokerConfigName);
                    if (configEntry != null) {
                        configBuilder.put(topicConfigName, (Object)configEntry.value());
                    }
                });
            }
            NewTopic newTopic = new NewTopic(null, defaultPartitions, defaultReplicationFactor);
            newTopic.configs((Map)configBuilder.build());
            NewTopic newTopic2 = newTopic;
            return newTopic2;
        }
    }

    @GET
    @Path(value="/topics/{topic}")
    public TopicDescription getTopic(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @PathParam(value="topic") String topic) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            TopicDescription topicDescription = kafkaDao.getTopic(topic);
            return topicDescription;
        }
    }

    @GET
    @Path(value="/topics/{topic}/config")
    public Config getTopicConfig(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @PathParam(value="topic") String topic) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            ConfigResource topicResource = KafkaResource.configResourceForTopic(topic);
            Map<ConfigResource, Config> configs = kafkaDao.getConfigs(Collections.singleton(topicResource));
            if (configs == null || configs.isEmpty() || !configs.containsKey(topicResource)) {
                throw new NotFoundException();
            }
            Config config = configs.get(topicResource);
            return config;
        }
    }

    @PUT
    @Path(value="/topics/{topic}/config")
    public void putTopicConfig(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @PathParam(value="topic") String topic, @QueryParam(value="validateOnly") @DefaultValue(value="false") boolean validateOnly, Config config) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            ConfigResource topicResource = KafkaResource.configResourceForTopic(topic);
            kafkaDao.alterConfigs(Collections.singletonMap(topicResource, config), validateOnly);
        }
    }

    @POST
    @Path(value="/acls")
    public Response createAcl(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, List<AclBinding> aclBindings) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            kafkaDao.createAcls(aclBindings);
        }
        return Response.accepted().build();
    }

    @DELETE
    @Path(value="/acls")
    public void deleteAcls(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, AclBindingFilter aclBindingFilter) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao dao = (KafkaDao)daoSupplier.get();){
            dao.deleteAcls(Collections.singleton(aclBindingFilter));
        }
    }

    @GET
    @Path(value="/acls")
    @PerformanceMetric(value="kafka.acls.get")
    public Collection<AclBinding> getAcls(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier) throws InterruptedException, ExecutionException, TimeoutException {
        return this.searchAcls(daoSupplier, AclBindingFilter.ANY);
    }

    @GET
    @Path(value="/acls/resources/{resourceType}/{name}")
    public Collection<AclBinding> listResourceAcls(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @PathParam(value="resourceType") ResourceType resourceType, @PathParam(value="name") String name) throws InterruptedException, ExecutionException, TimeoutException {
        AclBindingFilter aclFilter = new AclBindingFilter(new ResourcePatternFilter(resourceType, name, PatternType.LITERAL), AccessControlEntryFilter.ANY);
        return this.searchAcls(daoSupplier, aclFilter);
    }

    @GET
    @Path(value="/acls/resources/{resourceType}")
    public Collection<AclBinding> listAcls(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @PathParam(value="resourceType") ResourceType resourceType, @QueryParam(value="prefix") String prefix, @QueryParam(value="match") String match) throws InterruptedException, ExecutionException, TimeoutException {
        String name;
        PatternType patternType;
        if (prefix != null && match != null) {
            throw new BadRequestException("prefix cannot be combined with match");
        }
        if (prefix != null) {
            patternType = PatternType.PREFIXED;
            name = prefix;
        } else if (match != null) {
            patternType = PatternType.MATCH;
            name = match;
        } else {
            patternType = PatternType.ANY;
            name = null;
        }
        AclBindingFilter aclFilter = new AclBindingFilter(new ResourcePatternFilter(resourceType, name, patternType), AccessControlEntryFilter.ANY);
        return this.searchAcls(daoSupplier, aclFilter);
    }

    @GET
    @Path(value="/acls/principals/{principal}")
    public Collection<AclBinding> listPrincipalAcls(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @PathParam(value="principal") String principal) throws InterruptedException, ExecutionException, TimeoutException {
        AclBindingFilter aclFilter = new AclBindingFilter(ResourcePatternFilter.ANY, new AccessControlEntryFilter(principal, null, AclOperation.ANY, AclPermissionType.ANY));
        return this.searchAcls(daoSupplier, aclFilter);
    }

    @POST
    @Path(value="/acls:search")
    public Collection<AclBinding> searchAcls(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, AclBindingFilter aclBindingFilter) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao dao = (KafkaDao)daoSupplier.get();){
            Collection<AclBinding> collection = dao.getAcls(aclBindingFilter);
            return collection;
        }
    }

    private static ConfigResource configResourceForTopic(String topic) {
        return new ConfigResource(ConfigResource.Type.TOPIC, topic);
    }

    private static ConfigResource configResourceForBroker(String brokerId) {
        return new ConfigResource(ConfigResource.Type.BROKER, brokerId);
    }

    @GET
    @Path(value="/brokers/config")
    public Map<String, Config> getAllBrokerConfigs(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            Collection<Node> nodes = kafkaDao.getNodes();
            if (nodes.isEmpty()) {
                throw new NotFoundException("no valid brokers found for cluster=" + kafkaDao.clusterId());
            }
            Collection resources = Collections2.transform(nodes, (Function)new Function<Node, ConfigResource>(){

                public ConfigResource apply(Node input) {
                    return KafkaResource.configResourceForBroker(input.idString());
                }
            });
            Map<ConfigResource, Config> configs = kafkaDao.getConfigs(resources);
            HashMap out = Maps.newHashMap();
            for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
                out.put(entry.getKey().name(), entry.getValue());
            }
            HashMap hashMap = out;
            return hashMap;
        }
    }

    @GET
    @Path(value="/brokers/{brokerId}/config")
    public Config getBrokerConfig(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @PathParam(value="brokerId") String brokerId) throws InterruptedException, ExecutionException, TimeoutException {
        Throwable throwable = null;
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            ConfigResource resource = KafkaResource.configResourceForBroker(brokerId);
            try {
                Map<ConfigResource, Config> configs = kafkaDao.getConfigs(Collections.singleton(resource));
                if (configs == null || !configs.containsKey(resource)) {
                    throw new NotFoundException();
                }
                Config config = configs.get(resource);
                return config;
            }
            catch (BrokerNotAvailableException bnae) {
                try {
                    throw new NotFoundException();
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
            }
        }
    }

    @POST
    @Path(value="/brokers/config")
    public void postBrokerConfig(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier, @QueryParam(value="validateOnly") @DefaultValue(value="false") boolean validateOnly, Map<String, Config> configData) throws InterruptedException, ExecutionException, TimeoutException {
        if (!this.brokerConfigsEditEnabled) {
            throw new NotAllowedException("Broker config edit is disabled", new String[0]);
        }
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            HashMap<ConfigResource, Config> configs = new HashMap<ConfigResource, Config>();
            for (Map.Entry<String, Config> entry : configData.entrySet()) {
                configs.put(KafkaResource.configResourceForBroker(entry.getKey()), entry.getValue());
            }
            kafkaDao.alterConfigs(configs, true);
            if (!validateOnly) {
                kafkaDao.alterConfigs(configs, false);
            }
        }
    }

    @GET
    @Path(value="/nodes")
    public Collection<Node> nodes(@PathParam(value="clusterId") KafkaDaoSupplier daoSupplier) throws InterruptedException, ExecutionException, TimeoutException {
        try (KafkaDao kafkaDao = (KafkaDao)daoSupplier.get();){
            Collection<Node> collection = kafkaDao.getNodes();
            return collection;
        }
    }

    private static int defaultPartitions(Config brokerConfig) {
        int defaultPartitions = 1;
        ConfigEntry config = brokerConfig.get(KafkaConfig.NumPartitionsProp());
        if (config != null) {
            try {
                defaultPartitions = Integer.parseInt(config.value());
            }
            catch (NumberFormatException nfe) {
                log.warn("unable to parse default number of partitions config={}", (Object)config);
            }
        }
        return defaultPartitions;
    }

    private static short defaultReplicationFactor(Config brokerConfig) {
        short defaultReplicationFactor = 1;
        ConfigEntry config = brokerConfig.get(KafkaConfig.DefaultReplicationFactorProp());
        if (config != null) {
            try {
                defaultReplicationFactor = Short.parseShort(config.value());
            }
            catch (NumberFormatException nfe) {
                log.warn("unable to parse default replication factor config={}", (Object)config);
            }
        }
        return defaultReplicationFactor;
    }
}

