package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantGroupCoordinatorIntegrationTest.class */
public class MultiTenantGroupCoordinatorIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private final SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
    private final String groupId = "test-group";
    private final String topicName = "test-topic";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantGroupCoordinatorIntegrationTest$ReceivedRecords.class */
    public class ReceivedRecords {
        public final int count1;
        public final int count2;

        public ReceivedRecords(int i, int i2) {
            this.count1 = i;
            this.count2 = i2;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRegularExpressionSubscription(String str) throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        LogicalClusterUser user2 = this.logicalCluster2.user(21);
        this.physicalCluster.kafkaCluster().createTopic(user.tenantPrefix() + "foo1", 3, 1);
        this.physicalCluster.kafkaCluster().createTopic(user.tenantPrefix() + "foo2", 3, 1);
        this.physicalCluster.kafkaCluster().createTopic(user2.tenantPrefix() + "foo3", 3, 1);
        this.physicalCluster.kafkaCluster().createTopic(user2.tenantPrefix() + "foo4", 3, 1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(user);
        KafkaConsumer<String, String> createConsumerProtocolConsumer = createConsumerProtocolConsumer(user, "test-group", null);
        createConsumerProtocolConsumer.subscribe(new SubscriptionPattern("^foo.*"));
        pollUntilAssignmentUpdate(createConsumerProtocolConsumer);
        Assertions.assertEquals(Set.of("foo1", "foo2"), (Set) createConsumerProtocolConsumer.assignment().stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet()));
        Assertions.assertEquals(Set.of(new TopicPartition("foo1", 0), new TopicPartition("foo1", 1), new TopicPartition("foo1", 2), new TopicPartition("foo2", 0), new TopicPartition("foo2", 1), new TopicPartition("foo2", 2)), ((MemberAssignment) ((MemberDescription) ((ConsumerGroupDescription) ((KafkaFuture) createAdminClient.describeConsumerGroups(List.of("test-group")).describedGroups().get("test-group")).get()).members().iterator().next()).targetAssignment().orElseThrow()).topicPartitions());
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaTestUtils.sendRecords(createProducer, "foo1", 0, 100);
        KafkaTestUtils.sendRecords(createProducer, "foo2", 100, 100);
        KafkaProducer<String, String> createProducer2 = this.testHarness.createProducer(user2, this.securityProtocol);
        KafkaTestUtils.sendRecords(createProducer2, "foo3", 200, 100);
        KafkaTestUtils.sendRecords(createProducer2, "foo4", 300, 100);
        KafkaTestUtils.consumeRecords(createConsumerProtocolConsumer, 0, 200, 200);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testConsumerJoinClassicGroupToConsumerGroupUpgrade(String str) throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        this.physicalCluster.kafkaCluster().createTopic(user.tenantPrefix() + "test-topic", 5, 1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(user);
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> createClassicProtocolConsumer = createClassicProtocolConsumer(user, "test-group", null);
        createClassicProtocolConsumer.subscribe(Collections.singleton("test-topic"));
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 0, 100);
        KafkaTestUtils.consumeRecords(createClassicProtocolConsumer, 0, 100, 100);
        Assertions.assertEquals(GroupType.CLASSIC, getGroupType(createAdminClient, "test-group"));
        KafkaConsumer<String, String> createConsumerProtocolConsumer = createConsumerProtocolConsumer(user, "test-group", null);
        createConsumerProtocolConsumer.subscribe(Collections.singleton("test-topic"));
        pollUntilGroupJoined(createConsumerProtocolConsumer);
        Assertions.assertTrue(createConsumerProtocolConsumer.assignment().isEmpty(), "Partitions have been assigned to more than one consumer");
        Assertions.assertEquals(GroupType.CONSUMER, getGroupType(createAdminClient, "test-group"));
        pollUntilAssignmentUpdate(createClassicProtocolConsumer);
        pollUntilAssignmentUpdate(createConsumerProtocolConsumer);
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 100, 100);
        ReceivedRecords consumeRecords = consumeRecords(createClassicProtocolConsumer, createConsumerProtocolConsumer, 100, 100);
        Assertions.assertTrue(consumeRecords.count1 > 0, "Classic protocol consumer did not consume any records after group upgrade");
        Assertions.assertTrue(consumeRecords.count2 > 0, "Consumer protocol consumer did not consume any records after group upgrade");
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testStaticMemberReplacementClassicGroupToConsumerGroupUpgrade(String str) throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        this.physicalCluster.kafkaCluster().createTopic(user.tenantPrefix() + "test-topic", 5, 1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(user);
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> createClassicProtocolConsumer = createClassicProtocolConsumer(user, "test-group", null);
        createClassicProtocolConsumer.subscribe(Collections.singleton("test-topic"));
        Assertions.assertTrue(createClassicProtocolConsumer.assignment().isEmpty());
        pollUntilAssignmentUpdate(createClassicProtocolConsumer);
        KafkaConsumer<String, String> createClassicProtocolConsumer2 = createClassicProtocolConsumer(user, "test-group", "member2");
        createClassicProtocolConsumer2.subscribe(Collections.singleton("test-topic"));
        createClassicProtocolConsumer2.poll(Duration.ofMillis(250L));
        Assertions.assertTrue(createClassicProtocolConsumer2.assignment().isEmpty());
        pollUntilAssignmentUpdate(createClassicProtocolConsumer, createClassicProtocolConsumer2);
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 0, 100);
        ReceivedRecords consumeRecords = consumeRecords(createClassicProtocolConsumer, createClassicProtocolConsumer2, 0, 100);
        Assertions.assertTrue(consumeRecords.count1 > 0, "Classic protocol consumer 1 did not consume any records");
        Assertions.assertTrue(consumeRecords.count2 > 0, "Classic protocol consumer 2 did not consume any records");
        createClassicProtocolConsumer2.commitSync();
        Assertions.assertEquals(GroupType.CLASSIC, getGroupType(createAdminClient, "test-group"));
        createClassicProtocolConsumer2.commitSync();
        KafkaConsumer<String, String> createConsumerProtocolConsumer = createConsumerProtocolConsumer(user, "test-group", "member2");
        createConsumerProtocolConsumer.subscribe(Collections.singleton("test-topic"));
        Assertions.assertTrue(createConsumerProtocolConsumer.assignment().isEmpty());
        pollUntilAssignmentUpdate(createConsumerProtocolConsumer);
        Assertions.assertEquals(GroupType.CONSUMER, getGroupType(createAdminClient, "test-group"));
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 100, 100);
        ReceivedRecords consumeRecords2 = consumeRecords(createClassicProtocolConsumer, createConsumerProtocolConsumer, 100, 100);
        Assertions.assertTrue(consumeRecords2.count1 > 0, "Classic protocol consumer 1 did not consume any records after group upgrade");
        Assertions.assertTrue(consumeRecords2.count2 > 0, "Consumer protocol consumer 2 did not consume any records after group upgrade");
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testConsumerLeaveConsumerGroupToClassicGroupDowngrade(String str) throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        this.physicalCluster.kafkaCluster().createTopic(user.tenantPrefix() + "test-topic", 5, 1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(user);
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> createConsumerProtocolConsumer = createConsumerProtocolConsumer(user, "test-group", null);
        KafkaConsumer<String, String> createClassicProtocolConsumer = createClassicProtocolConsumer(user, "test-group", null);
        setupMixedConsumerGroup(createConsumerProtocolConsumer, createClassicProtocolConsumer, "test-topic");
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 0, 100);
        ReceivedRecords consumeRecords = consumeRecords(createConsumerProtocolConsumer, createClassicProtocolConsumer, 0, 100);
        Assertions.assertTrue(consumeRecords.count1 > 0, "Consumer protocol consumer did not consume any records");
        Assertions.assertTrue(consumeRecords.count2 > 0, "Classic protocol consumer did not consume any records");
        Assertions.assertEquals(GroupType.CONSUMER, getGroupType(createAdminClient, "test-group"));
        createConsumerProtocolConsumer.close();
        Assertions.assertEquals(GroupType.CLASSIC, getGroupType(createAdminClient, "test-group"));
        pollUntilAssignmentUpdate(createClassicProtocolConsumer);
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 100, 100);
        KafkaTestUtils.consumeRecords(createClassicProtocolConsumer, 100, 100, 100);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testStaticMemberReplacementConsumerGroupToClassicGroupDowngrade(String str) throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        this.physicalCluster.kafkaCluster().createTopic(user.tenantPrefix() + "test-topic", 5, 1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(user);
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> createConsumerProtocolConsumer = createConsumerProtocolConsumer(user, "test-group", "member1");
        KafkaConsumer<String, String> createClassicProtocolConsumer = createClassicProtocolConsumer(user, "test-group", null);
        setupMixedConsumerGroup(createConsumerProtocolConsumer, createClassicProtocolConsumer, "test-topic");
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 0, 100);
        ReceivedRecords consumeRecords = consumeRecords(createConsumerProtocolConsumer, createClassicProtocolConsumer, 0, 100);
        Assertions.assertTrue(consumeRecords.count1 > 0, "Consumer protocol consumer did not consume any records");
        Assertions.assertTrue(consumeRecords.count2 > 0, "Classic protocol consumer did not consume any records");
        Set assignment = createConsumerProtocolConsumer.assignment();
        Set assignment2 = createClassicProtocolConsumer.assignment();
        Assertions.assertEquals(GroupType.CONSUMER, getGroupType(createAdminClient, "test-group"));
        createConsumerProtocolConsumer.commitSync();
        KafkaConsumer<String, String> createClassicProtocolConsumer2 = createClassicProtocolConsumer(user, "test-group", "member1");
        createClassicProtocolConsumer2.subscribe(Collections.singleton("test-topic"));
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 100, 100);
        ReceivedRecords consumeRecords2 = consumeRecords(createClassicProtocolConsumer2, createClassicProtocolConsumer, 100, 100);
        Assertions.assertTrue(consumeRecords2.count1 > 0, "Classic protocol consumer 1 did not consume any records");
        Assertions.assertTrue(consumeRecords2.count2 > 0, "Classic protocol consumer 2 did not consume any records");
        Assertions.assertEquals(GroupType.CLASSIC, getGroupType(createAdminClient, "test-group"));
        Assertions.assertEquals(assignment, createClassicProtocolConsumer2.assignment(), "Group rebalanced unexpectedly on downgrade");
        Assertions.assertEquals(assignment2, createClassicProtocolConsumer.assignment(), "Group rebalanced unexpectedly on downgrade");
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testClassicJoinToConsumerGroup(String str) throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        this.physicalCluster.kafkaCluster().createTopic(user.tenantPrefix() + "test-topic", 5, 1);
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> createConsumerProtocolConsumer = createConsumerProtocolConsumer(user, "test-group", null);
        KafkaConsumer<String, String> createClassicProtocolConsumer = createClassicProtocolConsumer(user, "test-group", null);
        setupMixedConsumerGroup(createConsumerProtocolConsumer, createClassicProtocolConsumer, "test-topic");
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 0, 100);
        ReceivedRecords consumeRecords = consumeRecords(createConsumerProtocolConsumer, createClassicProtocolConsumer, 0, 100);
        Assertions.assertTrue(consumeRecords.count1 > 0, "Consumer protocol consumer did not consume any records");
        Assertions.assertTrue(consumeRecords.count2 > 0, "Classic protocol consumer did not consume any records");
    }

    private KafkaConsumer<String, String> createClassicProtocolConsumer(LogicalClusterUser logicalClusterUser, String str, String str2) {
        Properties properties = new Properties();
        properties.put("group.protocol", GroupProtocol.CLASSIC.toString());
        if (str2 != null) {
            properties.put("group.instance.id", str2);
        }
        return this.testHarness.createConsumer(logicalClusterUser, str, this.securityProtocol, Optional.empty(), properties);
    }

    private KafkaConsumer<String, String> createConsumerProtocolConsumer(LogicalClusterUser logicalClusterUser, String str, String str2) {
        Properties properties = new Properties();
        properties.put("group.protocol", GroupProtocol.CONSUMER.toString());
        if (str2 != null) {
            properties.put("group.instance.id", str2);
        }
        return this.testHarness.createConsumer(logicalClusterUser, str, this.securityProtocol, Optional.empty(), properties);
    }

    private void setupMixedConsumerGroup(KafkaConsumer<String, String> kafkaConsumer, KafkaConsumer<String, String> kafkaConsumer2, String str) {
        kafkaConsumer.subscribe(Collections.singleton(str));
        Assertions.assertTrue(kafkaConsumer.assignment().isEmpty());
        pollUntilAssignmentUpdate(kafkaConsumer);
        kafkaConsumer2.subscribe(Collections.singleton(str));
        pollUntilGroupJoined(kafkaConsumer2);
        Assertions.assertTrue(kafkaConsumer2.assignment().isEmpty());
        pollUntilAssignmentUpdate(kafkaConsumer);
        pollUntilAssignmentUpdate(kafkaConsumer2);
    }

    private GroupType getGroupType(AdminClient adminClient, String str) throws InterruptedException, ExecutionException {
        return ((ConsumerGroupDescription) ((KafkaFuture) adminClient.describeConsumerGroups(List.of(str)).describedGroups().get(str)).get()).type();
    }

    private ReceivedRecords consumeRecords(KafkaConsumer<String, String> kafkaConsumer, KafkaConsumer<String, String> kafkaConsumer2, int i, int i2) throws Exception {
        int i3 = 0;
        int i4 = 0;
        long nanoTime = System.nanoTime();
        long nanos = Duration.ofSeconds(30L).toNanos();
        while (i3 + i4 < i2 && System.nanoTime() - nanoTime < nanos) {
            i3 += KafkaTestUtils.consumeRecords(kafkaConsumer, i, 0, i2);
            i4 += KafkaTestUtils.consumeRecords(kafkaConsumer2, i, 0, i2);
        }
        Assertions.assertEquals(i2, i3 + i4);
        return new ReceivedRecords(i3, i4);
    }

    private void pollUntilGroupJoined(KafkaConsumer<String, String> kafkaConsumer) {
        ConsumerGroupMetadata groupMetadata = kafkaConsumer.groupMetadata();
        long nanoTime = System.nanoTime();
        long nanos = Duration.ofSeconds(30L).toNanos();
        while (groupMetadata.equals(kafkaConsumer.groupMetadata()) && System.nanoTime() - nanoTime < nanos) {
            kafkaConsumer.poll(Duration.ofMillis(250L));
        }
        Assertions.assertNotEquals(groupMetadata, kafkaConsumer.groupMetadata(), "Timed out waiting for group metadata update");
    }

    private void pollUntilAssignmentUpdate(KafkaConsumer<String, String> kafkaConsumer) {
        Set assignment = kafkaConsumer.assignment();
        long nanoTime = System.nanoTime();
        long nanos = Duration.ofSeconds(30L).toNanos();
        while (assignment.equals(kafkaConsumer.assignment()) && System.nanoTime() - nanoTime < nanos) {
            kafkaConsumer.poll(Duration.ofMillis(250L));
        }
        Assertions.assertNotEquals(assignment, kafkaConsumer.assignment(), "Timed out waiting for assignment update");
    }

    private void pollUntilAssignmentUpdate(KafkaConsumer<String, String> kafkaConsumer, KafkaConsumer<String, String> kafkaConsumer2) {
        Set assignment = kafkaConsumer.assignment();
        Set assignment2 = kafkaConsumer2.assignment();
        long nanoTime = System.nanoTime();
        long nanos = Duration.ofSeconds(30L).toNanos();
        while (true) {
            if ((assignment.equals(kafkaConsumer.assignment()) || assignment2.equals(kafkaConsumer2.assignment())) && System.nanoTime() - nanoTime < nanos) {
                kafkaConsumer.poll(Duration.ofMillis(250L));
                kafkaConsumer2.poll(Duration.ofMillis(250L));
            }
        }
        Assertions.assertNotEquals(assignment, kafkaConsumer.assignment(), "Timed out waiting for assignment update");
        Assertions.assertNotEquals(assignment2, kafkaConsumer2.assignment(), "Timed out waiting for assignment update");
    }
}
