package io.confluent.kafka.multitenant.integration.cluster;

import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.integration.test.DummyMultitenantMetadata;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.admin.AclCommand;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/multitenant/integration/cluster/PhysicalCluster.class */
public class PhysicalCluster {
    private static final Logger log = LoggerFactory.getLogger(PhysicalCluster.class);
    public static final KafkaPrincipal BROKER_PRINCIPAL = new KafkaPrincipal("User", "broker");
    private static final Pattern SASL_USERNAME_PATTERN = Pattern.compile("(?<clusterId>[^_]*)_(?<apiKey>.*)");
    private static Map<String, PhysicalCluster> instances = new ConcurrentHashMap();
    private final String physicalClusterId;
    private final Properties overrideProps;
    private final EmbeddedKafkaCluster kafkaCluster;
    private AdminClient superAdminClient;
    private final Random random;
    private final Map<Integer, UserMetadata> usersById;
    private final Map<String, UserMetadata> usersByApiKey;
    private final Map<String, LogicalCluster> logicalClusters;
    private final int numberOfBrokers;
    private final List<String> brokerRacks;
    private final List<String> brokerCells;

    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/cluster/PhysicalCluster$MultiTenantScramPrincipalBuilder.class */
    public static class MultiTenantScramPrincipalBuilder extends MultiTenantPrincipalBuilder implements Configurable {
        PhysicalCluster physicalCluster;

        public void configure(Map<String, ?> map) {
            this.physicalCluster = (PhysicalCluster) PhysicalCluster.instances.get((String) map.get("physical.cluster.id"));
            Assert.assertNotNull("Physical cluster not found", this.physicalCluster);
        }

        public KafkaPrincipal build(AuthenticationContext authenticationContext) {
            if (authenticationContext.securityProtocol() == SecurityProtocol.SASL_PLAINTEXT) {
                return this.physicalCluster.principal(((SaslAuthenticationContext) authenticationContext).server().getAuthorizationID());
            }
            return authenticationContext.securityProtocol() == SecurityProtocol.PLAINTEXT ? PhysicalCluster.BROKER_PRINCIPAL : super.build(authenticationContext);
        }
    }

    public PhysicalCluster(int i, Properties properties) {
        this(i, Collections.emptyList(), properties, Optional.empty());
    }

    public PhysicalCluster(int i, List<String> list, Properties properties, Optional<Time> optional) {
        this(i, list, Collections.emptyList(), properties, optional);
    }

    public PhysicalCluster(int i, List<String> list, List<String> list2, Properties properties, Optional<Time> optional) {
        this.physicalClusterId = String.valueOf(System.identityHashCode(this));
        this.kafkaCluster = (EmbeddedKafkaCluster) optional.map(EmbeddedKafkaCluster::new).orElseGet(EmbeddedKafkaCluster::new);
        this.random = new Random();
        this.usersById = new HashMap();
        this.usersByApiKey = new ConcurrentHashMap();
        this.logicalClusters = new HashMap();
        this.numberOfBrokers = i;
        this.brokerRacks = list;
        this.brokerCells = list2;
        this.overrideProps = new Properties();
        this.overrideProps.putAll(properties);
        this.overrideProps.put("principal.builder.class", MultiTenantScramPrincipalBuilder.class.getName());
        this.overrideProps.put("listener.name.external.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        if (!properties.containsKey("multitenant.metadata.class")) {
            this.overrideProps.put("multitenant.metadata.class", DummyMultitenantMetadata.class.getName());
        }
        this.overrideProps.put("physical.cluster.id", this.physicalClusterId);
    }

    public void makeBrokerSuperUser() {
        this.overrideProps.setProperty(AclAuthorizer.SuperUsersProp(), BROKER_PRINCIPAL.toString());
    }

    public void addBrokerAcls() {
        AclCommand.main(SecurityTestUtils.clusterAclArgs(this.kafkaCluster.zkConnect(), BROKER_PRINCIPAL, "ClusterAction"));
    }

    public synchronized void start(Consumer<PhysicalCluster> consumer) {
        instances.put(this.physicalClusterId, this);
        this.kafkaCluster.startZooKeeper();
        consumer.accept(this);
        Properties brokerConfig = KafkaTestUtils.brokerConfig(this.overrideProps);
        log.debug("Initiating Kafka cluster startup with config {}", brokerConfig);
        int parseInt = Integer.parseInt(brokerConfig.getOrDefault(KafkaConfig.BrokerIdProp(), "0").toString());
        for (int i = 0; i < this.numberOfBrokers; i++) {
            if (!this.brokerRacks.isEmpty()) {
                brokerConfig.put(KafkaConfig.RackProp(), this.brokerRacks.get(i));
            }
            if (!this.brokerCells.isEmpty()) {
                brokerConfig.put(KafkaConfig.BrokerTagsProp() + ".confluent.cell", this.brokerCells.get(i));
            }
            this.kafkaCluster.startBroker(parseInt + i, brokerConfig);
        }
    }

    public synchronized void shutdown() {
        try {
            if (this.superAdminClient != null) {
                this.superAdminClient.close();
            }
            this.kafkaCluster.shutdown();
            instances.remove(this.physicalClusterId);
        } catch (Throwable th) {
            instances.remove(this.physicalClusterId);
            throw th;
        }
    }

    public String bootstrapServers() {
        return this.kafkaCluster.bootstrapServers();
    }

    public EmbeddedKafkaCluster kafkaCluster() {
        return this.kafkaCluster;
    }

    public synchronized LogicalCluster createLogicalCluster(String str, int i, Integer... numArr) {
        if (this.logicalClusters.containsKey(str)) {
            throw new IllegalArgumentException("Logical cluster " + str + " already exists");
        }
        LogicalCluster logicalCluster = new LogicalCluster(this, str, getOrCreateUser(i, true));
        this.logicalClusters.put(str, logicalCluster);
        for (Integer num : numArr) {
            logicalCluster.addUser(getOrCreateUser(num.intValue(), false));
        }
        return logicalCluster;
    }

    public synchronized UserMetadata getOrCreateUser(int i, boolean z) {
        UserMetadata userMetadata = this.usersById.get(Integer.valueOf(i));
        if (userMetadata != null) {
            return userMetadata;
        }
        String str = "APIKEY" + i;
        UserMetadata userMetadata2 = new UserMetadata(i, str, "APISECRET-" + this.random.nextLong(), z);
        this.usersById.put(Integer.valueOf(i), userMetadata2);
        this.usersByApiKey.put(str, userMetadata2);
        return userMetadata2;
    }

    public MultiTenantPrincipal principal(String str) {
        Matcher matcher = SASL_USERNAME_PATTERN.matcher(str);
        if (!matcher.matches()) {
            throw new IllegalArgumentException("Invalid SASL user name " + str);
        }
        String group = matcher.group("apiKey");
        UserMetadata userMetadata = this.usersByApiKey.get(group);
        if (userMetadata == null) {
            throw new IllegalArgumentException("APIKey not found " + group);
        }
        return new MultiTenantPrincipal(String.valueOf(userMetadata.userId()), new TenantMetadata.Builder(matcher.group("clusterId")).superUser(userMetadata.isSuperUser()).build());
    }

    public synchronized AdminClient superAdminClient() {
        if (this.superAdminClient == null) {
            this.superAdminClient = KafkaTestUtils.createAdminClient(this.kafkaCluster.bootstrapServers("INTERNAL"), SecurityProtocol.PLAINTEXT, "", "");
        }
        return this.superAdminClient;
    }
}
