package io.confluent.kafkarest.controllers;

import io.confluent.kafkarest.common.KafkaFutures;
import io.confluent.kafkarest.entities.Broker;
import io.confluent.kafkarest.entities.BrokerConfig;
import io.confluent.kafkarest.entities.ConfigSource;
import io.confluent.kafkarest.entities.ConfigSynonym;
import io.confluent.kafkarest.entities.TopicConfig;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.NotFoundException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnsupportedVersionException;

/* loaded from: input_file:io/confluent/kafkarest/controllers/DefaultTopicConfigManagerImpl.class */
final class DefaultTopicConfigManagerImpl implements DefaultTopicConfigManager {
    private static final String NUM_PARTITIONS = "num.partitions";
    private static final String DEFAULT_REPLICATION_FACTOR = "default.replication.factor";
    private final Admin adminClient;
    private final ClusterManager clusterManager;
    private final BrokerConfigManager brokerConfigManager;
    private final BrokerManager brokerManager;

    @Inject
    DefaultTopicConfigManagerImpl(Admin admin, ClusterManager clusterManager, BrokerConfigManager brokerConfigManager, BrokerManager brokerManager) {
        this.adminClient = (Admin) Objects.requireNonNull(admin);
        this.clusterManager = (ClusterManager) Objects.requireNonNull(clusterManager);
        this.brokerConfigManager = (BrokerConfigManager) Objects.requireNonNull(brokerConfigManager);
        this.brokerManager = (BrokerManager) Objects.requireNonNull(brokerManager);
    }

    @Override // io.confluent.kafkarest.controllers.DefaultTopicConfigManager
    public CompletableFuture<List<TopicConfig>> listDefaultTopicConfigs(String str, String str2) {
        return this.clusterManager.getCluster(str).thenCompose(optional -> {
            Entities.checkEntityExists(optional, "Cluster %s cannot be found.", str);
            return KafkaFutures.toCompletableFuture(this.adminClient.createTopics(Collections.singletonList(new NewTopic(str2, (Optional<Integer>) Optional.empty(), (Optional<Short>) Optional.empty())), new CreateTopicsOptions().validateOnly(true)).config(str2));
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (config, th) -> {
            if (th == null) {
                return CompletableFuture.completedFuture(config);
            }
            if ((th.getCause() instanceof InvalidRequestException) || (th.getCause() instanceof UnsupportedVersionException)) {
                return getOldBrokerFuture(str, str2);
            }
            if (th instanceof RuntimeException) {
                throw ((RuntimeException) th);
            }
            throw new CompletionException(th);
        }).thenCompose(completableFuture -> {
            return completableFuture;
        }).thenCompose(config2 -> {
            return attemptToAddReplicationFactorAndPartition(str, config2);
        }).thenApply(config3 -> {
            return (List) config3.entries().stream().map(configEntry -> {
                return TopicConfig.builder().setClusterId(str).setTopicName(str2).setName(configEntry.name()).setValue(configEntry.value()).setDefault(configEntry.isDefault()).setReadOnly(configEntry.isReadOnly()).setSensitive(configEntry.isSensitive()).setSource(ConfigSource.fromAdminConfigSource(configEntry.source())).setSynonyms((List) configEntry.synonyms().stream().map(ConfigSynonym::fromAdminConfigSynonym).collect(Collectors.toList())).build();
            }).collect(Collectors.toList());
        });
    }

    private CompletableFuture<Config> getOldBrokerFuture(String str, String str2) {
        return this.brokerManager.listBrokers(str).thenCompose(list -> {
            return this.brokerConfigManager.listBrokerConfigs(str, ((Broker) list.stream().findFirst().get()).getBrokerId()).thenCompose(list -> {
                return KafkaFutures.toCompletableFuture(this.adminClient.createTopics(Collections.singletonList(new NewTopic(str2, Integer.parseInt(getConfigValue(list, NUM_PARTITIONS)), Short.parseShort(getConfigValue(list, DEFAULT_REPLICATION_FACTOR)))), new CreateTopicsOptions().validateOnly(true)).config(str2));
            });
        });
    }

    private CompletableFuture<Config> attemptToAddReplicationFactorAndPartition(String str, Config config) {
        return this.brokerManager.listBrokers(str).thenCompose(list -> {
            return this.brokerConfigManager.listBrokerConfigs(str, ((Broker) list.stream().findFirst().get()).getBrokerId()).thenCompose(list -> {
                Collection collection = (Collection) config.entries().stream().collect(Collectors.toList());
                BrokerConfig config2 = getConfig(list, NUM_PARTITIONS);
                BrokerConfig config3 = getConfig(list, DEFAULT_REPLICATION_FACTOR);
                ConfigEntry configEntry = new ConfigEntry(NUM_PARTITIONS, config2.getValue(), ConfigEntry.ConfigSource.DEFAULT_CONFIG, config2.isSensitive(), config2.isReadOnly(), Collections.emptyList(), ConfigEntry.ConfigType.INT, "");
                ConfigEntry configEntry2 = new ConfigEntry(DEFAULT_REPLICATION_FACTOR, config3.getValue(), ConfigEntry.ConfigSource.DEFAULT_CONFIG, config3.isSensitive(), config3.isReadOnly(), Collections.emptyList(), ConfigEntry.ConfigType.INT, "");
                collection.add(configEntry);
                collection.add(configEntry2);
                return CompletableFuture.completedFuture(new Config(collection));
            });
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (config2, th) -> {
            return th != null ? config : config2;
        });
    }

    private String getConfigValue(List<BrokerConfig> list, String str) {
        return getConfig(list, str).getValue();
    }

    private BrokerConfig getConfig(List<BrokerConfig> list, String str) {
        return (BrokerConfig) Entities.findEntityByKey(list, (v0) -> {
            return v0.getName();
        }, str).orElseThrow(() -> {
            return new NotFoundException(String.format("Could not determine required broker configuration: %s", str));
        });
    }
}
