/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.controllers;

import io.confluent.kafkarest.common.KafkaFutures;
import io.confluent.kafkarest.controllers.BrokerConfigManager;
import io.confluent.kafkarest.controllers.BrokerManager;
import io.confluent.kafkarest.controllers.ClusterManager;
import io.confluent.kafkarest.controllers.DefaultTopicConfigManager;
import io.confluent.kafkarest.controllers.Entities;
import io.confluent.kafkarest.entities.AbstractConfig;
import io.confluent.kafkarest.entities.Broker;
import io.confluent.kafkarest.entities.BrokerConfig;
import io.confluent.kafkarest.entities.Cluster;
import io.confluent.kafkarest.entities.ConfigSource;
import io.confluent.kafkarest.entities.ConfigSynonym;
import io.confluent.kafkarest.entities.TopicConfig;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.NotFoundException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnsupportedVersionException;

final class DefaultTopicConfigManagerImpl
implements DefaultTopicConfigManager {
    private final Admin adminClient;
    private final ClusterManager clusterManager;
    private final BrokerConfigManager brokerConfigManager;
    private final BrokerManager brokerManager;

    @Inject
    DefaultTopicConfigManagerImpl(Admin adminClient, ClusterManager clusterManager, BrokerConfigManager brokerConfigManager, BrokerManager brokerManager) {
        this.adminClient = Objects.requireNonNull(adminClient);
        this.clusterManager = Objects.requireNonNull(clusterManager);
        this.brokerConfigManager = Objects.requireNonNull(brokerConfigManager);
        this.brokerManager = Objects.requireNonNull(brokerManager);
    }

    @Override
    public CompletableFuture<List<TopicConfig>> listDefaultTopicConfigs(String clusterId, String topicName) {
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.clusterManager.getCluster(clusterId).thenApply(cluster -> (Cluster)Entities.checkEntityExists((Optional)cluster, (String)"Cluster %s cannot be found.", (Object[])new Object[]{clusterId}))).thenCompose(cluster -> KafkaFutures.toCompletableFuture((KafkaFuture)this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, Optional.empty(), Optional.empty())), new CreateTopicsOptions().validateOnly(true)).config(topicName)))).handle((result, ex) -> {
            if (ex != null) {
                if (ex.getCause() instanceof InvalidRequestException || ex.getCause() instanceof UnsupportedVersionException) {
                    return this.getOldBrokerFuture(clusterId, topicName);
                }
                if (ex instanceof RuntimeException) {
                    throw (RuntimeException)ex;
                }
                throw new CompletionException((Throwable)ex);
            }
            return CompletableFuture.completedFuture(result);
        })).thenCompose(handled -> handled.thenApply(response -> response.entries().stream().map(entry -> (TopicConfig)((TopicConfig.Builder)((TopicConfig.Builder)((TopicConfig.Builder)((TopicConfig.Builder)((TopicConfig.Builder)((TopicConfig.Builder)((TopicConfig.Builder)((TopicConfig.Builder)TopicConfig.builder().setClusterId(clusterId)).setTopicName(topicName).setName(entry.name())).setValue(entry.value())).setDefault(entry.isDefault())).setReadOnly(entry.isReadOnly())).setSensitive(entry.isSensitive())).setSource(ConfigSource.fromAdminConfigSource((ConfigEntry.ConfigSource)entry.source()))).setSynonyms(entry.synonyms().stream().map(ConfigSynonym::fromAdminConfigSynonym).collect(Collectors.toList()))).build()).collect(Collectors.toList())));
    }

    private CompletableFuture<Config> getOldBrokerFuture(String clusterId, String topicName) {
        CompletionStage future = this.brokerManager.listBrokers(clusterId).thenCompose(brokers -> this.brokerConfigManager.listBrokerConfigs(clusterId, ((Broker)brokers.stream().findFirst().get()).getBrokerId()).thenCompose(brokerConfig -> {
            int partitions = Integer.parseInt(this.getConfigValue((List<BrokerConfig>)brokerConfig, "num.partitions"));
            short replicationFactor = Short.parseShort(this.getConfigValue((List<BrokerConfig>)brokerConfig, "default.replication.factor"));
            return KafkaFutures.toCompletableFuture((KafkaFuture)this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, partitions, replicationFactor)), new CreateTopicsOptions().validateOnly(true)).config(topicName));
        }));
        return future;
    }

    private String getConfigValue(List<BrokerConfig> brokerConfigs, String configName) {
        return ((BrokerConfig)Entities.findEntityByKey(brokerConfigs, AbstractConfig::getName, (Object)configName).orElseThrow(() -> new NotFoundException(String.format("Could not determine required broker configuration: %s", configName)))).getValue();
    }
}

