package io.confluent.kafka.multitenant.integration.cluster;

import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.integration.test.DummyMultitenantMetadata;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import io.confluent.kafka.test.utils.AclCommandBuilder;
import io.confluent.kafka.test.utils.ClientSecuritySpec;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.BrokerSession;
import kafka.server.KafkaConfig;
import kafka.utils.EmptyTestInfo;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.network.PublicCredential;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.scram.internals.ScramServerCallbackHandler;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/multitenant/integration/cluster/PhysicalCluster.class */
public class PhysicalCluster {
    private static final Logger log = LoggerFactory.getLogger(PhysicalCluster.class);
    public static final KafkaPrincipal BROKER_PRINCIPAL = new KafkaPrincipal("User", "broker");
    private static final Pattern SASL_USERNAME_PATTERN = Pattern.compile("(?<clusterId>[^_]*)_(?<apiKey>.*)");
    private static Map<String, PhysicalCluster> instances = new ConcurrentHashMap();
    private final String physicalClusterId;
    private final EmbeddedKafkaCluster kafkaCluster;
    private AdminClient superAdminClient;
    private ConfluentAdmin confluentAdmin;
    private final Random random;
    private final Map<Integer, UserMetadata> usersById;
    private final Map<String, UserMetadata> usersByApiKey;
    private final Map<String, LogicalCluster> logicalClusters;
    private final int numberOfBrokers;
    private final List<String> brokerRacks;
    private final List<String> brokerCells;
    private boolean isBrokerSuperUser;

    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/cluster/PhysicalCluster$KRaftScramIntegrationTestCallbackHandler.class */
    public static class KRaftScramIntegrationTestCallbackHandler extends ScramServerCallbackHandler {
        private static CredentialCache.Cache<ScramCredential> credentialCache;

        static void initialize() {
            if (credentialCache == null) {
                credentialCache = new CredentialCache.Cache<>(ScramCredential.class);
            }
        }

        static void addCredential(String str, ScramCredential scramCredential) {
            credentialCache.put(str, scramCredential);
        }

        static void removeCredential(String str) {
            credentialCache.remove(str);
        }

        public KRaftScramIntegrationTestCallbackHandler() {
            super(credentialCache, (DelegationTokenCache) null);
        }
    }

    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/cluster/PhysicalCluster$MultiTenantScramPrincipalBuilder.class */
    public static class MultiTenantScramPrincipalBuilder extends MultiTenantPrincipalBuilder implements Configurable {
        PhysicalCluster physicalCluster;

        public void configure(Map<String, ?> map) {
            this.physicalCluster = (PhysicalCluster) PhysicalCluster.instances.get((String) map.get("physical.cluster.id"));
            Assertions.assertNotNull(this.physicalCluster, "Physical cluster not found");
        }

        public KafkaPrincipal build(AuthenticationContext authenticationContext) {
            if (authenticationContext.securityProtocol() == SecurityProtocol.SASL_PLAINTEXT) {
                return this.physicalCluster.principal(((SaslAuthenticationContext) authenticationContext).server().getAuthorizationID());
            }
            return authenticationContext.securityProtocol() == SecurityProtocol.PLAINTEXT ? PhysicalCluster.BROKER_PRINCIPAL : super.build(authenticationContext);
        }
    }

    public PhysicalCluster(int i) {
        this(i, Collections.emptyList(), Optional.empty());
    }

    public PhysicalCluster(int i, List<String> list, Optional<Time> optional) {
        this(i, list, Collections.emptyList(), optional);
    }

    public PhysicalCluster(int i, List<String> list, List<String> list2, Optional<Time> optional) {
        this(i, list, list2, optional, new EmptyTestInfo());
    }

    public PhysicalCluster(int i, List<String> list, List<String> list2, Optional<Time> optional, TestInfo testInfo) {
        this.physicalClusterId = String.valueOf(System.identityHashCode(this));
        this.isBrokerSuperUser = true;
        this.kafkaCluster = new EmbeddedKafkaCluster(optional.orElse(new MockTime()), testInfo);
        this.random = new Random();
        this.usersById = new HashMap();
        this.usersByApiKey = new ConcurrentHashMap();
        this.logicalClusters = new HashMap();
        this.numberOfBrokers = i;
        this.brokerRacks = list;
        this.brokerCells = list2;
        instances.put(this.physicalClusterId, this);
    }

    public void disableBrokerSuperUser() {
        if (isKRaft()) {
            throw new IllegalArgumentException("KRaft requires broker principal to be a super user");
        }
        this.isBrokerSuperUser = false;
    }

    private Properties commonConfigs(Set<String> set) {
        Properties properties = new Properties();
        properties.put("principal.builder.class", MultiTenantScramPrincipalBuilder.class.getName());
        properties.setProperty("physical.cluster.id", this.physicalClusterId);
        properties.setProperty(KafkaConfig.InterBrokerListenerNameProp(), "INTERNAL");
        if (isKRaft()) {
            properties.setProperty(KafkaConfig.ControllerListenerNamesProp(), "CONTROLLER");
        }
        HashSet hashSet = new HashSet(set);
        if (this.isBrokerSuperUser) {
            hashSet.add(BROKER_PRINCIPAL.toString());
        }
        if (!hashSet.isEmpty()) {
            properties.setProperty(AclAuthorizer.SuperUsersProp(), String.join(";", hashSet));
        }
        return properties;
    }

    public synchronized void startQuorum(Properties properties) {
        startQuorum(properties, Collections.emptySet());
    }

    public synchronized void startQuorum(Properties properties, Set<String> set) {
        if (!isKRaft()) {
            this.kafkaCluster.startQuorum();
            return;
        }
        Properties commonConfigs = commonConfigs(set);
        commonConfigs.setProperty(KafkaConfig.ListenersProp(), "CONTROLLER://localhost:0");
        commonConfigs.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT");
        commonConfigs.putAll(properties);
        log.debug("Initializing kraft controller quorum with config {}", commonConfigs);
        this.kafkaCluster.startQuorum(commonConfigs);
    }

    public synchronized void startBrokers(Properties properties) {
        startBrokers(properties, Collections.emptySet());
    }

    public synchronized void startBrokers(Properties properties, Set<String> set) {
        Properties commonConfigs = commonConfigs(set);
        commonConfigs.setProperty(KafkaConfig.ListenersProp(), "INTERNAL://localhost:0,EXTERNAL://localhost:0");
        commonConfigs.setProperty(KafkaConfig.SaslEnabledMechanismsProp(), "SCRAM-SHA-256");
        commonConfigs.setProperty("listener.name.external.scram-sha-256.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        commonConfigs.setProperty("listener.name.external.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        commonConfigs.setProperty("multitenant.metadata.class", DummyMultitenantMetadata.class.getName());
        if (isKRaft()) {
            commonConfigs.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT");
            KRaftScramIntegrationTestCallbackHandler.initialize();
            commonConfigs.setProperty("listener.name.external.scram-sha-256.sasl.server.callback.handler.class", KRaftScramIntegrationTestCallbackHandler.class.getName());
        } else {
            commonConfigs.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
        }
        commonConfigs.putAll(properties);
        log.debug("Initiating Kafka cluster startup with broker config {}", commonConfigs);
        int parseInt = Integer.parseInt(commonConfigs.getOrDefault(KafkaConfig.BrokerIdProp(), "0").toString());
        for (int i = 0; i < this.numberOfBrokers; i++) {
            if (!this.brokerRacks.isEmpty()) {
                commonConfigs.put(KafkaConfig.RackProp(), this.brokerRacks.get(i));
            }
            if (!this.brokerCells.isEmpty()) {
                commonConfigs.put(KafkaConfig.BrokerTagsProp() + ".confluent.cell", this.brokerCells.get(i));
            }
            this.kafkaCluster.startBroker(parseInt + i, commonConfigs);
        }
    }

    public List<BasePhysicalClusterMetadata> clusterMetadataInstances() {
        List list = (List) kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> {
            Object obj = kafkaBroker.config().values().get(KafkaConfig.BrokerSessionUuidProp());
            return obj == null ? "" : obj.toString();
        }).distinct().collect(Collectors.toList());
        Assertions.assertEquals(this.numberOfBrokers, list.size(), "Expect each broker to have unique session UUID.");
        List<BasePhysicalClusterMetadata> list2 = (List) list.stream().map(BasePhysicalClusterMetadata::getInstance).collect(Collectors.toList());
        list2.forEach(basePhysicalClusterMetadata -> {
            Assertions.assertTrue(basePhysicalClusterMetadata instanceof TopicBasedPhysicalClusterMetadata, "Expected valid instance of TopicBasedPhysicalClusterMetadata for all broker sessions");
        });
        return list2;
    }

    public synchronized void shutdown() {
        try {
            if (this.superAdminClient != null) {
                this.superAdminClient.close();
            }
            if (this.confluentAdmin != null) {
                this.confluentAdmin.close();
            }
            this.kafkaCluster.shutdown();
            instances.remove(this.physicalClusterId);
        } catch (Throwable th) {
            instances.remove(this.physicalClusterId);
            throw th;
        }
    }

    public String bootstrapServers() {
        return this.kafkaCluster.bootstrapServers();
    }

    public String bootstrapServers(String str) {
        return this.kafkaCluster.bootstrapServers(str);
    }

    public EmbeddedKafkaCluster kafkaCluster() {
        return this.kafkaCluster;
    }

    public synchronized LogicalCluster createLogicalCluster(String str, int i, Integer... numArr) {
        return createLogicalCluster(str, null, null, i, numArr);
    }

    public synchronized LogicalCluster createLogicalCluster(String str, String str2, String str3, int i, Integer... numArr) {
        if (this.logicalClusters.containsKey(str)) {
            throw new IllegalArgumentException("Logical cluster " + str + " already exists");
        }
        LogicalCluster logicalCluster = new LogicalCluster(this, str, str2, str3, getOrCreateUser(i, true));
        this.logicalClusters.put(str, logicalCluster);
        for (Integer num : numArr) {
            logicalCluster.addUser(getOrCreateUser(num.intValue(), false));
        }
        return logicalCluster;
    }

    public synchronized UserMetadata getOrCreateUser(int i, boolean z) {
        return getOrCreateUser(i, z, !z);
    }

    public synchronized UserMetadata getOrCreateUser(int i, boolean z, boolean z2) {
        UserMetadata userMetadata = this.usersById.get(Integer.valueOf(i));
        if (userMetadata != null) {
            return userMetadata;
        }
        String str = "APIKEY" + i;
        UserMetadata userMetadata2 = new UserMetadata(i, str, "APISECRET-" + this.random.nextLong(), z, z2);
        this.usersById.put(Integer.valueOf(i), userMetadata2);
        this.usersByApiKey.put(str, userMetadata2);
        return userMetadata2;
    }

    public MultiTenantPrincipal principal(String str) {
        Matcher matcher = SASL_USERNAME_PATTERN.matcher(str);
        if (!matcher.matches()) {
            throw new IllegalArgumentException("Invalid SASL user name " + str);
        }
        String group = matcher.group("apiKey");
        UserMetadata userMetadata = this.usersByApiKey.get(group);
        if (userMetadata == null) {
            throw new IllegalArgumentException("APIKey not found " + group);
        }
        String group2 = matcher.group("clusterId");
        TenantMetadata build = new TenantMetadata.Builder(group2, userMetadata.userResourceId()).serviceAccount(userMetadata.isServiceAccount()).healthcheckTenant(userMetadata.isSuperUser()).apiKeyAuthenticated(false).build();
        build.updateOrgProperties(this.logicalClusters.get(group2).orgId(), this.logicalClusters.get(group2).envId());
        return new MultiTenantPrincipal(String.valueOf(userMetadata.userId()), str, build);
    }

    public ClientSecuritySpec internalListenerClientSpec() {
        return ClientSecuritySpec.plaintext(this.kafkaCluster.bootstrapServers("INTERNAL"));
    }

    public AclCommandBuilder newAclCommand() {
        AclCommandBuilder aclCommandBuilder = new AclCommandBuilder();
        if (isKRaft()) {
            aclCommandBuilder.setClientSecuritySpec(internalListenerClientSpec());
        } else {
            aclCommandBuilder.setZookeeperConnect(this.kafkaCluster.zkConnect());
        }
        return aclCommandBuilder;
    }

    public AclCommandBuilder newAclCommandWithExternalListener(String str) {
        return new AclCommandBuilder().setClientSecuritySpec(new ClientSecuritySpec(bootstrapServers("EXTERNAL"), SecurityProtocol.SASL_PLAINTEXT, "PLAIN", str));
    }

    public synchronized AdminClient superAdminClient() {
        if (this.superAdminClient == null) {
            this.superAdminClient = KafkaTestUtils.createAdminClient(this.kafkaCluster.bootstrapServers("INTERNAL"), SecurityProtocol.PLAINTEXT, "", "");
        }
        return this.superAdminClient;
    }

    public synchronized ConfluentAdmin superConfluentAdmin() {
        if (this.confluentAdmin == null) {
            this.confluentAdmin = KafkaTestUtils.createConfluentAdmin(this.kafkaCluster.bootstrapServers("INTERNAL"), SecurityProtocol.PLAINTEXT, "", "");
        }
        return this.confluentAdmin;
    }

    public boolean isKRaft() {
        return this.kafkaCluster.isKRaft();
    }

    public void addScramUser(String str, String str2) {
        if (!isKRaft()) {
            SecurityTestUtils.createScramUser(kafkaCluster(), str, str2);
            return;
        }
        try {
            KRaftScramIntegrationTestCallbackHandler.addCredential(str, new ScramFormatter(ScramMechanism.SCRAM_SHA_256).generateCredential(str2, 4096));
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteScramUser(String str) {
        PublicCredential saslCredential = PublicCredential.saslCredential(str, ScramMechanism.SCRAM_SHA_256.mechanismName());
        if (isKRaft()) {
            KRaftScramIntegrationTestCallbackHandler.removeCredential(str);
        } else {
            SecurityTestUtils.deleteScramUser(this.kafkaCluster, str);
        }
        this.kafkaCluster.kafkaBrokers().forEach(kafkaBroker -> {
            BrokerSession.session(kafkaBroker.config().brokerSessionUuid()).handleCredentialDelete(saslCredential);
        });
    }
}
