/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tags(value={@Tag(value="integration"), @Tag(value="bazel:shard_count:4")})
@Timeout(value=600L)
public class MultiTenantGroupCoordinatorIntegrationTest
extends AbstractMultiTenantKafkaIntegrationTest {
    private final SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
    private final String groupId = "test-group";
    private final String topicName = "test-topic";

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testRegularExpressionSubscription(String quorum) throws Throwable {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        LogicalClusterUser user1 = this.logicalCluster1.user(11);
        LogicalClusterUser user2 = this.logicalCluster2.user(21);
        this.physicalCluster.kafkaCluster().createTopic(user1.tenantPrefix() + "foo1", 3, 1);
        this.physicalCluster.kafkaCluster().createTopic(user1.tenantPrefix() + "foo2", 3, 1);
        this.physicalCluster.kafkaCluster().createTopic(user2.tenantPrefix() + "foo3", 3, 1);
        this.physicalCluster.kafkaCluster().createTopic(user2.tenantPrefix() + "foo4", 3, 1);
        AdminClient adminClient = this.testHarness.createAdminClient(user1);
        KafkaConsumer<String, String> consumerConsumer = this.createConsumerProtocolConsumer(user1, "test-group", null);
        consumerConsumer.subscribe(new SubscriptionPattern("^foo.*"));
        this.pollUntilAssignmentUpdate(consumerConsumer);
        Set assignedTopicNames = consumerConsumer.assignment().stream().map(TopicPartition::topic).collect(Collectors.toSet());
        Assertions.assertEquals(Set.of("foo1", "foo2"), assignedTopicNames);
        ConsumerGroupDescription groupDescription = (ConsumerGroupDescription)((KafkaFuture)adminClient.describeConsumerGroups(List.of("test-group")).describedGroups().get("test-group")).get();
        MemberDescription memberDescription = (MemberDescription)groupDescription.members().iterator().next();
        Assertions.assertEquals(Set.of(new TopicPartition("foo1", 0), new TopicPartition("foo1", 1), new TopicPartition("foo1", 2), new TopicPartition("foo2", 0), new TopicPartition("foo2", 1), new TopicPartition("foo2", 2)), (Object)((MemberAssignment)memberDescription.targetAssignment().orElseThrow()).topicPartitions());
        KafkaProducer<String, String> producer1 = this.testHarness.createProducer(user1, this.securityProtocol);
        KafkaTestUtils.sendRecords(producer1, "foo1", 0, 100);
        KafkaTestUtils.sendRecords(producer1, "foo2", 100, 100);
        KafkaProducer<String, String> producer2 = this.testHarness.createProducer(user2, this.securityProtocol);
        KafkaTestUtils.sendRecords(producer2, "foo3", 200, 100);
        KafkaTestUtils.sendRecords(producer2, "foo4", 300, 100);
        KafkaTestUtils.consumeRecords(consumerConsumer, 0, 200, 200);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testConsumerJoinClassicGroupToConsumerGroupUpgrade(String quorum) throws Throwable {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        String prefixedTopicName = user.tenantPrefix() + "test-topic";
        this.physicalCluster.kafkaCluster().createTopic(prefixedTopicName, 5, 1);
        AdminClient adminClient = this.testHarness.createAdminClient(user);
        KafkaProducer<String, String> producer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> classicConsumer = this.createClassicProtocolConsumer(user, "test-group", null);
        classicConsumer.subscribe(Collections.singleton("test-topic"));
        KafkaTestUtils.sendRecords(producer, "test-topic", 0, 100);
        KafkaTestUtils.consumeRecords(classicConsumer, 0, 100, 100);
        Assertions.assertEquals((Object)GroupType.CLASSIC, (Object)this.getGroupType(adminClient, "test-group"));
        KafkaConsumer<String, String> consumerConsumer = this.createConsumerProtocolConsumer(user, "test-group", null);
        consumerConsumer.subscribe(Collections.singleton("test-topic"));
        this.pollUntilGroupJoined(consumerConsumer);
        Assertions.assertTrue((boolean)consumerConsumer.assignment().isEmpty(), (String)"Partitions have been assigned to more than one consumer");
        Assertions.assertEquals((Object)GroupType.CONSUMER, (Object)this.getGroupType(adminClient, "test-group"));
        this.pollUntilAssignmentUpdate(classicConsumer);
        this.pollUntilAssignmentUpdate(consumerConsumer);
        KafkaTestUtils.sendRecords(producer, "test-topic", 100, 100);
        ReceivedRecords receivedRecords = this.consumeRecords(classicConsumer, consumerConsumer, 100, 100);
        Assertions.assertTrue((receivedRecords.count1 > 0 ? 1 : 0) != 0, (String)"Classic protocol consumer did not consume any records after group upgrade");
        Assertions.assertTrue((receivedRecords.count2 > 0 ? 1 : 0) != 0, (String)"Consumer protocol consumer did not consume any records after group upgrade");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testStaticMemberReplacementClassicGroupToConsumerGroupUpgrade(String quorum) throws Throwable {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        String prefixedTopicName = user.tenantPrefix() + "test-topic";
        this.physicalCluster.kafkaCluster().createTopic(prefixedTopicName, 5, 1);
        AdminClient adminClient = this.testHarness.createAdminClient(user);
        KafkaProducer<String, String> producer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> classicConsumer1 = this.createClassicProtocolConsumer(user, "test-group", null);
        classicConsumer1.subscribe(Collections.singleton("test-topic"));
        Assertions.assertTrue((boolean)classicConsumer1.assignment().isEmpty());
        this.pollUntilAssignmentUpdate(classicConsumer1);
        KafkaConsumer<String, String> classicConsumer2 = this.createClassicProtocolConsumer(user, "test-group", "member2");
        classicConsumer2.subscribe(Collections.singleton("test-topic"));
        classicConsumer2.poll(Duration.ofMillis(250L));
        Assertions.assertTrue((boolean)classicConsumer2.assignment().isEmpty());
        this.pollUntilAssignmentUpdate(classicConsumer1, classicConsumer2);
        KafkaTestUtils.sendRecords(producer, "test-topic", 0, 100);
        ReceivedRecords receivedRecords1 = this.consumeRecords(classicConsumer1, classicConsumer2, 0, 100);
        Assertions.assertTrue((receivedRecords1.count1 > 0 ? 1 : 0) != 0, (String)"Classic protocol consumer 1 did not consume any records");
        Assertions.assertTrue((receivedRecords1.count2 > 0 ? 1 : 0) != 0, (String)"Classic protocol consumer 2 did not consume any records");
        classicConsumer2.commitSync();
        Set consumer1AssignmentBeforeUpgrade = classicConsumer1.assignment();
        Set consumer2AssignmentBeforeUpgrade = classicConsumer2.assignment();
        Assertions.assertEquals((Object)GroupType.CLASSIC, (Object)this.getGroupType(adminClient, "test-group"));
        classicConsumer2.commitSync();
        KafkaConsumer<String, String> consumerConsumer2 = this.createConsumerProtocolConsumer(user, "test-group", "member2");
        consumerConsumer2.subscribe(Collections.singleton("test-topic"));
        Assertions.assertTrue((boolean)consumerConsumer2.assignment().isEmpty());
        this.pollUntilAssignmentUpdate(consumerConsumer2);
        Assertions.assertEquals((Object)GroupType.CONSUMER, (Object)this.getGroupType(adminClient, "test-group"));
        KafkaTestUtils.sendRecords(producer, "test-topic", 100, 100);
        ReceivedRecords receivedRecords2 = this.consumeRecords(classicConsumer1, consumerConsumer2, 100, 100);
        Assertions.assertTrue((receivedRecords2.count1 > 0 ? 1 : 0) != 0, (String)"Classic protocol consumer 1 did not consume any records after group upgrade");
        Assertions.assertTrue((receivedRecords2.count2 > 0 ? 1 : 0) != 0, (String)"Consumer protocol consumer 2 did not consume any records after group upgrade");
        Assertions.assertEquals((Object)consumer1AssignmentBeforeUpgrade, (Object)classicConsumer1.assignment(), (String)"Group assignment was recomputed unexpectedly on upgrade");
        Assertions.assertEquals((Object)consumer2AssignmentBeforeUpgrade, (Object)consumerConsumer2.assignment(), (String)"Group assignment was recomputed unexpectedly on upgrade");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testConsumerLeaveConsumerGroupToClassicGroupDowngrade(String quorum) throws Throwable {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        String prefixedTopicName = user.tenantPrefix() + "test-topic";
        this.physicalCluster.kafkaCluster().createTopic(prefixedTopicName, 5, 1);
        AdminClient adminClient = this.testHarness.createAdminClient(user);
        KafkaProducer<String, String> producer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> consumerConsumer = this.createConsumerProtocolConsumer(user, "test-group", null);
        KafkaConsumer<String, String> classicConsumer = this.createClassicProtocolConsumer(user, "test-group", null);
        this.setupMixedConsumerGroup(consumerConsumer, classicConsumer, "test-topic");
        KafkaTestUtils.sendRecords(producer, "test-topic", 0, 100);
        ReceivedRecords receivedRecords = this.consumeRecords(consumerConsumer, classicConsumer, 0, 100);
        Assertions.assertTrue((receivedRecords.count1 > 0 ? 1 : 0) != 0, (String)"Consumer protocol consumer did not consume any records");
        Assertions.assertTrue((receivedRecords.count2 > 0 ? 1 : 0) != 0, (String)"Classic protocol consumer did not consume any records");
        Assertions.assertEquals((Object)GroupType.CONSUMER, (Object)this.getGroupType(adminClient, "test-group"));
        consumerConsumer.close();
        Assertions.assertEquals((Object)GroupType.CLASSIC, (Object)this.getGroupType(adminClient, "test-group"));
        this.pollUntilAssignmentUpdate(classicConsumer);
        KafkaTestUtils.sendRecords(producer, "test-topic", 100, 100);
        KafkaTestUtils.consumeRecords(classicConsumer, 100, 100, 100);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testStaticMemberReplacementConsumerGroupToClassicGroupDowngrade(String quorum) throws Throwable {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        String prefixedTopicName = user.tenantPrefix() + "test-topic";
        this.physicalCluster.kafkaCluster().createTopic(prefixedTopicName, 5, 1);
        AdminClient adminClient = this.testHarness.createAdminClient(user);
        KafkaProducer<String, String> producer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> consumerConsumer1 = this.createConsumerProtocolConsumer(user, "test-group", "member1");
        KafkaConsumer<String, String> classicConsumer2 = this.createClassicProtocolConsumer(user, "test-group", null);
        this.setupMixedConsumerGroup(consumerConsumer1, classicConsumer2, "test-topic");
        KafkaTestUtils.sendRecords(producer, "test-topic", 0, 100);
        ReceivedRecords receivedRecords1 = this.consumeRecords(consumerConsumer1, classicConsumer2, 0, 100);
        Assertions.assertTrue((receivedRecords1.count1 > 0 ? 1 : 0) != 0, (String)"Consumer protocol consumer did not consume any records");
        Assertions.assertTrue((receivedRecords1.count2 > 0 ? 1 : 0) != 0, (String)"Classic protocol consumer did not consume any records");
        Set consumer1AssignmentBeforeDowngrade = consumerConsumer1.assignment();
        Set consumer2AssignmentBeforeDowngrade = classicConsumer2.assignment();
        Assertions.assertEquals((Object)GroupType.CONSUMER, (Object)this.getGroupType(adminClient, "test-group"));
        consumerConsumer1.commitSync();
        KafkaConsumer<String, String> classicConsumer1 = this.createClassicProtocolConsumer(user, "test-group", "member1");
        classicConsumer1.subscribe(Collections.singleton("test-topic"));
        KafkaTestUtils.sendRecords(producer, "test-topic", 100, 100);
        ReceivedRecords receivedRecords2 = this.consumeRecords(classicConsumer1, classicConsumer2, 100, 100);
        Assertions.assertTrue((receivedRecords2.count1 > 0 ? 1 : 0) != 0, (String)"Classic protocol consumer 1 did not consume any records");
        Assertions.assertTrue((receivedRecords2.count2 > 0 ? 1 : 0) != 0, (String)"Classic protocol consumer 2 did not consume any records");
        Assertions.assertEquals((Object)GroupType.CLASSIC, (Object)this.getGroupType(adminClient, "test-group"));
        Assertions.assertEquals((Object)consumer1AssignmentBeforeDowngrade, (Object)classicConsumer1.assignment(), (String)"Group rebalanced unexpectedly on downgrade");
        Assertions.assertEquals((Object)consumer2AssignmentBeforeDowngrade, (Object)classicConsumer2.assignment(), (String)"Group rebalanced unexpectedly on downgrade");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testClassicJoinToConsumerGroup(String quorum) throws Throwable {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        String prefixedTopicName = user.tenantPrefix() + "test-topic";
        this.physicalCluster.kafkaCluster().createTopic(prefixedTopicName, 5, 1);
        KafkaProducer<String, String> producer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaConsumer<String, String> consumerConsumer = this.createConsumerProtocolConsumer(user, "test-group", null);
        KafkaConsumer<String, String> classicConsumer = this.createClassicProtocolConsumer(user, "test-group", null);
        this.setupMixedConsumerGroup(consumerConsumer, classicConsumer, "test-topic");
        KafkaTestUtils.sendRecords(producer, "test-topic", 0, 100);
        ReceivedRecords receivedRecords = this.consumeRecords(consumerConsumer, classicConsumer, 0, 100);
        Assertions.assertTrue((receivedRecords.count1 > 0 ? 1 : 0) != 0, (String)"Consumer protocol consumer did not consume any records");
        Assertions.assertTrue((receivedRecords.count2 > 0 ? 1 : 0) != 0, (String)"Classic protocol consumer did not consume any records");
    }

    private KafkaConsumer<String, String> createClassicProtocolConsumer(LogicalClusterUser user, String groupId, String groupInstanceId) {
        Properties classicProperties = new Properties();
        classicProperties.put("group.protocol", GroupProtocol.CLASSIC.toString());
        if (groupInstanceId != null) {
            classicProperties.put("group.instance.id", groupInstanceId);
        }
        return this.testHarness.createConsumer(user, groupId, this.securityProtocol, Optional.empty(), classicProperties);
    }

    private KafkaConsumer<String, String> createConsumerProtocolConsumer(LogicalClusterUser user, String groupId, String groupInstanceId) {
        Properties consumerProperties = new Properties();
        consumerProperties.put("group.protocol", GroupProtocol.CONSUMER.toString());
        if (groupInstanceId != null) {
            consumerProperties.put("group.instance.id", groupInstanceId);
        }
        return this.testHarness.createConsumer(user, groupId, this.securityProtocol, Optional.empty(), consumerProperties);
    }

    private void setupMixedConsumerGroup(KafkaConsumer<String, String> consumerConsumer, KafkaConsumer<String, String> classicConsumer, String topicName) {
        consumerConsumer.subscribe(Collections.singleton(topicName));
        Assertions.assertTrue((boolean)consumerConsumer.assignment().isEmpty());
        this.pollUntilAssignmentUpdate(consumerConsumer);
        classicConsumer.subscribe(Collections.singleton(topicName));
        this.pollUntilGroupJoined(classicConsumer);
        Assertions.assertTrue((boolean)classicConsumer.assignment().isEmpty());
        this.pollUntilAssignmentUpdate(consumerConsumer);
        this.pollUntilAssignmentUpdate(classicConsumer);
    }

    private GroupType getGroupType(AdminClient adminClient, String groupId) throws InterruptedException, ExecutionException {
        return ((ConsumerGroupDescription)((KafkaFuture)adminClient.describeConsumerGroups(List.of(groupId)).describedGroups().get(groupId)).get()).type();
    }

    private ReceivedRecords consumeRecords(KafkaConsumer<String, String> consumer1, KafkaConsumer<String, String> consumer2, int first, int count) throws Exception {
        int received1 = 0;
        int received2 = 0;
        long startTimeNs = System.nanoTime();
        long timeoutNs = Duration.ofSeconds(30L).toNanos();
        while (received1 + received2 < count && System.nanoTime() - startTimeNs < timeoutNs) {
            received1 += KafkaTestUtils.consumeRecords(consumer1, first, 0, count);
            received2 += KafkaTestUtils.consumeRecords(consumer2, first, 0, count);
        }
        Assertions.assertEquals((int)count, (int)(received1 + received2));
        return new ReceivedRecords(received1, received2);
    }

    private void pollUntilGroupJoined(KafkaConsumer<String, String> consumer) {
        ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
        long startTimeNs = System.nanoTime();
        long timeoutNs = Duration.ofSeconds(30L).toNanos();
        while (groupMetadata.equals((Object)consumer.groupMetadata()) && System.nanoTime() - startTimeNs < timeoutNs) {
            consumer.poll(Duration.ofMillis(250L));
        }
        Assertions.assertNotEquals((Object)groupMetadata, (Object)consumer.groupMetadata(), (String)"Timed out waiting for group metadata update");
    }

    private void pollUntilAssignmentUpdate(KafkaConsumer<String, String> consumer) {
        Set assignment = consumer.assignment();
        long startTimeNs = System.nanoTime();
        long timeoutNs = Duration.ofSeconds(30L).toNanos();
        while (assignment.equals(consumer.assignment()) && System.nanoTime() - startTimeNs < timeoutNs) {
            consumer.poll(Duration.ofMillis(250L));
        }
        Assertions.assertNotEquals((Object)assignment, (Object)consumer.assignment(), (String)"Timed out waiting for assignment update");
    }

    private void pollUntilAssignmentUpdate(KafkaConsumer<String, String> consumer1, KafkaConsumer<String, String> consumer2) {
        Set assignment1 = consumer1.assignment();
        Set assignment2 = consumer2.assignment();
        long startTimeNs = System.nanoTime();
        long timeoutNs = Duration.ofSeconds(30L).toNanos();
        while ((assignment1.equals(consumer1.assignment()) || assignment2.equals(consumer2.assignment())) && System.nanoTime() - startTimeNs < timeoutNs) {
            consumer1.poll(Duration.ofMillis(250L));
            consumer2.poll(Duration.ofMillis(250L));
        }
        Assertions.assertNotEquals((Object)assignment1, (Object)consumer1.assignment(), (String)"Timed out waiting for assignment update");
        Assertions.assertNotEquals((Object)assignment2, (Object)consumer2.assignment(), (String)"Timed out waiting for assignment update");
    }

    private class ReceivedRecords {
        public final int count1;
        public final int count2;

        public ReceivedRecords(int count1, int count2) {
            this.count1 = count1;
            this.count2 = count2;
        }
    }
}

