/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.cluster;

import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.UserMetadata;
import io.confluent.kafka.multitenant.integration.test.DummyMultitenantMetadata;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.admin.AclCommand;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhysicalCluster {
    private static final Logger log = LoggerFactory.getLogger(PhysicalCluster.class);
    public static final KafkaPrincipal BROKER_PRINCIPAL = new KafkaPrincipal("User", "broker");
    private static final Pattern SASL_USERNAME_PATTERN = Pattern.compile("(?<clusterId>[^_]*)_(?<apiKey>.*)");
    private static Map<String, PhysicalCluster> instances = new ConcurrentHashMap<String, PhysicalCluster>();
    private final String physicalClusterId = String.valueOf(System.identityHashCode(this));
    private final Properties overrideProps;
    private final EmbeddedKafkaCluster kafkaCluster;
    private AdminClient superAdminClient;
    private final Random random;
    private final Map<Integer, UserMetadata> usersById;
    private final Map<String, UserMetadata> usersByApiKey;
    private final Map<String, LogicalCluster> logicalClusters;
    private final int numberOfBrokers;
    private final List<String> brokerRacks;
    private final List<String> brokerCells;

    public PhysicalCluster(int brokers, Properties props) {
        this(brokers, Collections.emptyList(), props, Optional.empty());
    }

    public PhysicalCluster(int brokers, List<String> brokerRacks, Properties props, Optional<Time> time) {
        this(brokers, brokerRacks, Collections.emptyList(), props, time);
    }

    public PhysicalCluster(int brokers, List<String> brokerRacks, List<String> brokerCells, Properties props, Optional<Time> time) {
        this.kafkaCluster = time.map(EmbeddedKafkaCluster::new).orElseGet(EmbeddedKafkaCluster::new);
        this.random = new Random();
        this.usersById = new HashMap<Integer, UserMetadata>();
        this.usersByApiKey = new ConcurrentHashMap<String, UserMetadata>();
        this.logicalClusters = new HashMap<String, LogicalCluster>();
        this.numberOfBrokers = brokers;
        this.brokerRacks = brokerRacks;
        this.brokerCells = brokerCells;
        this.overrideProps = new Properties();
        this.overrideProps.putAll((Map<?, ?>)props);
        this.overrideProps.put("principal.builder.class", MultiTenantScramPrincipalBuilder.class.getName());
        this.overrideProps.put("listener.name.external.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        if (!props.containsKey("multitenant.metadata.class")) {
            this.overrideProps.put("multitenant.metadata.class", DummyMultitenantMetadata.class.getName());
        }
        this.overrideProps.put("physical.cluster.id", this.physicalClusterId);
    }

    public void makeBrokerSuperUser() {
        this.overrideProps.setProperty(AclAuthorizer.SuperUsersProp(), BROKER_PRINCIPAL.toString());
    }

    public void addBrokerAcls() {
        String[] aclArgs = SecurityTestUtils.clusterAclArgs(this.kafkaCluster.zkConnect(), BROKER_PRINCIPAL, "ClusterAction");
        AclCommand.main((String[])aclArgs);
    }

    public synchronized void start(Consumer<PhysicalCluster> configureSecurityForBroker) {
        instances.put(this.physicalClusterId, this);
        this.kafkaCluster.startZooKeeper();
        configureSecurityForBroker.accept(this);
        Properties brokerConfigs = KafkaTestUtils.brokerConfig(this.overrideProps);
        log.debug("Initiating Kafka cluster startup with config {}", (Object)brokerConfigs);
        int brokerIdStart = Integer.parseInt(brokerConfigs.getOrDefault((Object)KafkaConfig.BrokerIdProp(), "0").toString());
        for (int i = 0; i < this.numberOfBrokers; ++i) {
            if (!this.brokerRacks.isEmpty()) {
                brokerConfigs.put(KafkaConfig.RackProp(), this.brokerRacks.get(i));
            }
            if (!this.brokerCells.isEmpty()) {
                brokerConfigs.put(KafkaConfig.BrokerTagsProp() + "." + "confluent.cell", this.brokerCells.get(i));
            }
            this.kafkaCluster.startBroker(brokerIdStart + i, brokerConfigs);
        }
    }

    public synchronized void shutdown() {
        try {
            if (this.superAdminClient != null) {
                this.superAdminClient.close();
            }
            this.kafkaCluster.shutdown();
        }
        finally {
            instances.remove(this.physicalClusterId);
        }
    }

    public String bootstrapServers() {
        return this.kafkaCluster.bootstrapServers();
    }

    public EmbeddedKafkaCluster kafkaCluster() {
        return this.kafkaCluster;
    }

    public synchronized LogicalCluster createLogicalCluster(String clusterId, int adminUserId, Integer ... serviceIds) {
        if (this.logicalClusters.containsKey(clusterId)) {
            throw new IllegalArgumentException("Logical cluster " + clusterId + " already exists");
        }
        UserMetadata adminUser = this.getOrCreateUser(adminUserId, true);
        LogicalCluster logicalCluster = new LogicalCluster(this, clusterId, adminUser);
        this.logicalClusters.put(clusterId, logicalCluster);
        for (Integer userId : serviceIds) {
            logicalCluster.addUser(this.getOrCreateUser(userId, false));
        }
        return logicalCluster;
    }

    public synchronized UserMetadata getOrCreateUser(int userId, boolean isSuperUser) {
        UserMetadata userMetadata = this.usersById.get(userId);
        if (userMetadata != null) {
            return userMetadata;
        }
        String apiKey = "APIKEY" + userId;
        String apiSecret = "APISECRET-" + this.random.nextLong();
        userMetadata = new UserMetadata(userId, apiKey, apiSecret, isSuperUser);
        this.usersById.put(userId, userMetadata);
        this.usersByApiKey.put(apiKey, userMetadata);
        return userMetadata;
    }

    public MultiTenantPrincipal principal(String saslUserName) {
        Matcher matcher = SASL_USERNAME_PATTERN.matcher(saslUserName);
        if (!matcher.matches()) {
            throw new IllegalArgumentException("Invalid SASL user name " + saslUserName);
        }
        String apiKey = matcher.group("apiKey");
        UserMetadata user = this.usersByApiKey.get(apiKey);
        if (user == null) {
            throw new IllegalArgumentException("APIKey not found " + apiKey);
        }
        String logicalClusterId = matcher.group("clusterId");
        TenantMetadata tenantMetadata = new TenantMetadata.Builder(logicalClusterId).superUser(user.isSuperUser()).build();
        return new MultiTenantPrincipal(String.valueOf(user.userId()), tenantMetadata);
    }

    public synchronized AdminClient superAdminClient() {
        if (this.superAdminClient == null) {
            this.superAdminClient = KafkaTestUtils.createAdminClient(this.kafkaCluster.bootstrapServers("INTERNAL"), SecurityProtocol.PLAINTEXT, "", "");
        }
        return this.superAdminClient;
    }

    public static class MultiTenantScramPrincipalBuilder
    extends MultiTenantPrincipalBuilder
    implements Configurable {
        PhysicalCluster physicalCluster;

        public void configure(Map<String, ?> configs) {
            this.physicalCluster = (PhysicalCluster)instances.get((String)configs.get("physical.cluster.id"));
            Assert.assertNotNull((String)"Physical cluster not found", (Object)this.physicalCluster);
        }

        public KafkaPrincipal build(AuthenticationContext context) {
            if (context.securityProtocol() == SecurityProtocol.SASL_PLAINTEXT) {
                SaslAuthenticationContext saslContext = (SaslAuthenticationContext)context;
                String authzId = saslContext.server().getAuthorizationID();
                return this.physicalCluster.principal(authzId);
            }
            if (context.securityProtocol() == SecurityProtocol.PLAINTEXT) {
                return BROKER_PRINCIPAL;
            }
            return super.build(context);
        }
    }
}

