/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.LogicalClusterMetadata;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.assignor.TenantPartitionAssignorBuilder;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import java.io.IOException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.QuotaType;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;

@Tag(value="integration")
public abstract class AbstractMultiTenantKafkaIntegrationTest {
    protected static final int BROKER_COUNT = 2;
    protected IntegrationTestHarness testHarness;
    protected LogicalCluster logicalCluster1;
    protected LogicalCluster logicalCluster2;
    protected PhysicalCluster physicalCluster;
    protected final int maxPartitionCount = 100;
    protected Path tempDir;
    private TestInfo testInfo;

    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        this.testInfo = testInfo;
        this.tempDir = TestUtils.tempDirectory().toPath();
    }

    public void setUp() {
        this.setUp(2, Collections.emptyList());
    }

    public void setUp(int brokerCount, List<String> brokerRacks) {
        this.setUp(brokerCount, brokerRacks, Collections.emptyList());
    }

    public void setUp(int brokerCount, List<String> brokerRacks, List<String> brokerCells) {
        this.testHarness = new IntegrationTestHarness(this.testInfo, brokerCount, brokerRacks, brokerCells);
    }

    protected void createPhysicalAndLogicalClusters() {
        this.createPhysicalAndLogicalClusters(this.brokerProps());
    }

    protected void createPhysicalAndLogicalClusters(Properties brokerProperties) {
        this.physicalCluster = this.testHarness.start(brokerProperties);
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
    }

    @AfterEach
    public void tearDown() {
        this.testHarness.shutdown();
    }

    protected Properties brokerProps() {
        Properties props = new Properties();
        props.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        props.put("confluent.max.acls.per.tenant", "100");
        props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp(), "true");
        props.put("multitenant.metadata.class", "io.confluent.kafka.multitenant.PhysicalClusterMetadata");
        props.put("confluent.topic.replica.assignor.builder.class", TenantPartitionAssignorBuilder.class.getName());
        try {
            props.put("multitenant.metadata.dir", this.tempDir.toRealPath(new LinkOption[0]).toString());
        }
        catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        props.put("confluent.multitenant.max.partitions.per.request", Integer.toString(100));
        props.put("confluent.multitenant.listener.names", "EXTERNAL");
        props.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        props.put(KafkaConfig.CreateTopicPolicyClassNameProp(), CreateTopicPolicy.class.getName());
        props.put("confluent.plugins.topic.policy.replication.factor", "1");
        props.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        props.put("confluent.broker.load.enabled", "true");
        props.put("confluent.broker.load.delay.metric.start.ms", "0");
        props.put("confluent.broker.load.update.metric.tags.interval.ms", "100");
        props.put("confluent.broker.load.window.size.ms", "100");
        return props;
    }

    protected void awaitMetadataPropagation() {
        int numberOfBrokers = this.physicalCluster.kafkaCluster().brokers().size();
        try {
            TestUtils.waitForCondition(() -> ((Collection)this.physicalCluster.superAdminClient().describeCluster().nodes().get()).size() == numberOfBrokers, (String)String.format("Metadata was not updated in time to reflect the %d brokers", numberOfBrokers));
            TestUtils.waitForCondition(() -> this.physicalCluster.kafkaCluster().controllerBrokerServer().adminManager().metadataCache().getAliveBrokers().size() == numberOfBrokers, (String)String.format("Metadata was not updated in time to reflect the %d brokers", numberOfBrokers));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void addLkcFileAndSyncMetadata(LogicalClusterMetadata lcm) throws Throwable {
        Utils.createLogicalClusterFile(lcm, this.tempDir);
        TestUtils.waitForCondition(() -> this.physicalCluster.kafkaCluster().brokers().stream().allMatch(broker -> {
            PhysicalClusterMetadata physicalClustermetadata = (PhysicalClusterMetadata)broker.multiTenantMetadata().get();
            return physicalClustermetadata.metadata(lcm.logicalClusterId()) != null;
        }), (String)"Expected metadata of new logical cluster to be present in metadata cache");
    }

    protected void deleteLkcFileAndSyncMetadata(LogicalClusterMetadata lcm) throws Throwable {
        Utils.deleteLogicalClusterFile(lcm, this.tempDir);
        TestUtils.waitForCondition(() -> this.physicalCluster.kafkaCluster().brokers().stream().allMatch(broker -> {
            PhysicalClusterMetadata physicalClustermetadata = (PhysicalClusterMetadata)broker.multiTenantMetadata().get();
            return physicalClustermetadata.metadata(lcm.logicalClusterId()) == null;
        }), (String)"Expected metadata of deleted logical cluster to not be present in metadata cache");
    }

    protected Set<AclBinding> describeAllAcls(AdminClient adminClient) throws Exception {
        Collection acls = (Collection)adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).values().get();
        return new HashSet<AclBinding>(acls);
    }

    protected void verifyQuotaCallbackLimit(ClientQuotaType quotaType, Map<String, String> tags, double expectedQuota) throws Exception {
        List quotaCallbacks = this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> (ClientQuotaCallback)kafkaServer.quotaManagers().clientQuotaCallback().get()).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> quotaCallbacks.stream().allMatch(callback -> callback.quotaLimit(quotaType, tags) == expectedQuota), (String)String.format("Timed out waiting for quota callback to update quota %s for tenant %s to %s", quotaType, tags, expectedQuota));
    }

    protected void verifyExpectedTenantQuota(ClientQuotaType quotaType, Map<String, String> tags, double expectedQuota) throws Exception {
        List<KafkaServer> brokers = quotaType == ClientQuotaType.CONTROLLER_MUTATION ? Collections.singletonList(this.physicalCluster.kafkaCluster().controllerBrokerServer()) : this.physicalCluster.kafkaCluster().brokers();
        TestUtils.waitForCondition(() -> brokers.stream().map(KafkaServer::metrics).allMatch(metrics -> this.verifyTenantMetricQuotaValue((Metrics)metrics, quotaType, tags, expectedQuota)), (String)String.format("Timed out waiting for quota metric %s for tenant %s to update to %s", quotaType, tags, expectedQuota));
    }

    protected boolean verifyTenantMetricQuotaValue(Metrics metrics, ClientQuotaType quotaType, Map<String, String> tags, double expectedQuota) {
        MetricName metricName = metrics.metricName("tokens", QuotaType.fromClientQuotaType((ClientQuotaType)quotaType).toString(), tags);
        return expectedQuota == metrics.metric(metricName).config().quota().bound();
    }

    protected void verifyTenantThrottled(ClientQuotaType quotaType, Map<String, String> tags) throws Exception {
        List<KafkaServer> brokers = quotaType == ClientQuotaType.CONTROLLER_MUTATION ? Collections.singletonList(this.physicalCluster.kafkaCluster().controllerBrokerServer()) : this.physicalCluster.kafkaCluster().brokers();
        TestUtils.waitForCondition(() -> brokers.stream().map(KafkaServer::metrics).anyMatch(metrics -> this.verifyTenantMetricThrottleValue((Metrics)metrics, quotaType, tags, true)), (String)String.format("Timed out waiting for %s tenant throttle metric for %s to be non-zero", quotaType, tags));
    }

    protected void verifyTenantNotThrottled(ClientQuotaType quotaType, Map<String, String> tags) throws Exception {
        List<KafkaServer> brokers = quotaType == ClientQuotaType.CONTROLLER_MUTATION ? Collections.singletonList(this.physicalCluster.kafkaCluster().controllerBrokerServer()) : this.physicalCluster.kafkaCluster().brokers();
        TestUtils.waitForCondition(() -> brokers.stream().map(KafkaServer::metrics).allMatch(metrics -> this.verifyTenantMetricThrottleValue((Metrics)metrics, quotaType, tags, false)), (String)String.format("Timed out waiting for %s tenant throttle metric for %s to be non-zero", quotaType, tags));
    }

    private boolean verifyTenantMetricThrottleValue(Metrics metrics, ClientQuotaType quotaType, Map<String, String> tags, boolean expectThrottle) {
        MetricName metricName = metrics.metricName("throttle-time", QuotaType.fromClientQuotaType((ClientQuotaType)quotaType).toString(), tags);
        double throttleValue = (Double)metrics.metric(metricName).metricValue();
        return expectThrottle == throttleValue > 0.0;
    }
}

